Thursday, January 25, 2018
How to log connection detail denied by firewall
Saturday, January 20, 2018
How to fix "Service Monitor" and "Host Monitor" failure during Cloudera CDH5 cluster restart
HTTPFS & WebHDFS API Examples
# user.name=donghua for authentication purpose
[donghua@cdh-vm ~]$ curl -i -X PUT "http://cdh-vm:14000/webhdfs/v1/user/donghua/newdir?op=MKDIRS&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516447651395&s=wq4TMpf9TNtSCrFiqs1Tam66ZzQ="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:27:31 GMT
{"boolean":true}
# List directory & Files in /user/donghua
[donghua@cdh-vm ~]$ curl -i -X GET "http://cdh-vm:14000/webhdfs/v1/user/donghua/?op=LISTSTATUS&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516447856962&s=aZyV3CQ997aG09J+mQQsC4FDRjU="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:30:57 GMT
{"FileStatuses":{"FileStatus":[{"pathSuffix":".Trash","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"700","accessTime":0,"modificationTime":1515927600117,"blockSize":0,"replication":0},{"pathSuffix":".sparkStaging","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"755","accessTime":0,"modificationTime":1514001258000,"blockSize":0,"replication":0},{"pathSuffix":".staging","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"700","accessTime":0,"modificationTime":1514379310197,"blockSize":0,"replication":0},{"pathSuffix":"mkdir","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"755","accessTime":0,"modificationTime":1516411617837,"blockSize":0,"replication":0},{"pathSuffix":"monthly_taxi_fleet","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"755","accessTime":0,"modificationTime":1514024183300,"blockSize":0,"replication":0},{"pathSuffix":"monthly_taxi_fleet.ddl","type":"FILE","length":729,"owner":"donghua","group":"supergroup","permission":"644","accessTime":1516411543045,"modificationTime":1516411543262,"blockSize":134217728,"replication":1},{"pathSuffix":"newdir","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"755","accessTime":0,"modificationTime":1516411651448,"blockSize":0,"replication":0},{"pathSuffix":"newdir2","type":"DIRECTORY","length":0,"owner":"donghua","group":"supergroup","permission":"755","accessTime":0,"modificationTime":1516411701096,"blockSize":0,"replication":0}]}}
# Get file contents
[donghua@cdh-vm ~]$ curl -i -X GET "http://cdh-vm:14000/webhdfs/v1/user/donghua/monthly_taxi_fleet.ddl?op=OPEN&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516447887889&s=nq4WkUAsHUMl06yPMecNmUS/s38="; Path=/; HttpOnly
Content-Type: application/octet-stream
Content-Length: 729
Date: Sat, 20 Jan 2018 01:31:27 GMT
createtab_stmt
CREATE EXTERNAL TABLE `monthly_taxi_fleet`(
`month` char(7),
`company` varchar(50),
`fleet` smallint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://cdh-vm.dbaglobe.com:8020/user/donghua/monthly_taxi_fleet'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='false',
'numFiles'='1',
'numRows'='-1',
'rawDataSize'='-1',
'skip.header.line.count'='1',
'totalSize'='25802',
'transient_lastDdlTime'='1514024883')
# Get file Status
donghua@cdh-vm ~]$ curl -i -X GET "http://cdh-vm:14000/webhdfs/v1/user/donghua/monthly_taxi_fleet.ddl?op=GETFILESTATUS&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516448141481&s=VmoBQqQUjfd/DryQ4WctTLwoPcw="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:35:41 GMT
{"FileStatus":{"pathSuffix":"","type":"FILE","length":729,"owner":"donghua","group":"supergroup","permission":"644","accessTime":1516411543045,"modificationTime":1516411543262,"blockSize":134217728,"replication":1}}
# Delete file
[donghua@cdh-vm ~]$ curl -i -X DELETE "http://cdh-vm:14000/webhdfs/v1/user/donghua/monthly_taxi_fleet.ddl?op=DELETE&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516448233112&s=aIVcU49l57oWd5QesPBchhaYiTM="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:37:13 GMT
donghua@cdh-vm ~]$ curl -i -X GET "http://cdh-vm:14000/webhdfs/v1/user/donghua/monthly_taxi_fleet.ddl?op=GETFILESTATUS&user.name=donghua"
HTTP/1.1 404 Not Found
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516448282348&s=rHkMNLdVJe/gN6Wouy+lqon257Q="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:38:02 GMT
{"RemoteException":{"message":"File does not exist: \/user\/donghua\/monthly_taxi_fleet.ddl","exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException"}}
# Delete directory recursively
[donghua@cdh-vm ~]$ curl -i -X DELETE "http://cdh-vm:14000/webhdfs/v1/user/donghua/newdir?op=DELETE&recursive=true&user.name=donghua"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=donghua&p=donghua&t=simple-dt&e=1516448379021&s=OTgN6OcRp7R1DFYPTxHrTdyUhRk="; Path=/; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Sat, 20 Jan 2018 01:39:39 GMT
WebHDFS uses port 50070 & enabled by default:
Sunday, January 14, 2018
Sample configuration using nginx LB to access Cloudera Manager
Saturday, January 6, 2018
R: Work with SQL Server using RODBC
> library(RODBC)
> # Connect SQL Server using integration Security
> connStr <- paste("Server=WIN2016\\SQL2017",
+ "Driver=SQL Server",
+ "Database=AdventureWorks2017",
+ sep=";")
> conn <-odbcDriverConnect(connStr)
> tab <- sqlTables(conn)
> head(tab)
TABLE_CAT TABLE_SCHEM TABLE_NAME TABLE_TYPE REMARKS
1 AdventureWorks2017 dbo AWBuildVersion TABLE <NA>
2 AdventureWorks2017 dbo DatabaseLog TABLE <NA>
3 AdventureWorks2017 dbo ErrorLog TABLE <NA>
4 AdventureWorks2017 HumanResources Department TABLE <NA>
5 AdventureWorks2017 HumanResources Employee TABLE <NA>
6 AdventureWorks2017 HumanResources EmployeeDepartmentHistory TABLE <NA>
> emp <- sqlFetch(conn, "HumanResources.Employee")
> head(emp)
BusinessEntityID NationalIDNumber LoginID OrganizationNode OrganizationLevel
1 1 295847284 adventure-works\\ken0 NA
2 2 245797967 adventure-works\\terri0 58 1
3 3 509647174 adventure-works\\roberto0 5a, c0 2
4 4 112457891 adventure-works\\rob0 5a, d6 3
5 5 695256908 adventure-works\\gail0 5a, da 3
6 6 998320692 adventure-works\\jossef0 5a, de 3
JobTitle BirthDate MaritalStatus Gender HireDate SalariedFlag VacationHours SickLeaveHours
1 Chief Executive Officer 1969-01-29 S M 2009-01-14 1 99 69
2 Vice President of Engineering 1971-08-01 S F 2008-01-31 1 1 20
3 Engineering Manager 1974-11-12 M M 2007-11-11 1 2 21
4 Senior Tool Designer 1974-12-23 S M 2007-12-05 0 48 80
5 Design Engineer 1952-09-27 M F 2008-01-06 1 5 22
6 Design Engineer 1959-03-11 M M 2008-01-24 1 6 23
CurrentFlag rowguid ModifiedDate
1 1 F01251E5-96A3-448D-981E-0F99D789110D 2014-06-30
2 1 45E8F437-670D-4409-93CB-F9424A40D6EE 2014-06-30
3 1 9BBBFB2C-EFBB-4217-9AB7-F97689328841 2014-06-30
4 1 59747955-87B8-443F-8ED4-F8AD3AFDF3A9 2014-06-30
5 1 EC84AE09-F9B8-4A15-B4A9-6CCBAB919B08 2014-06-30
6 1 E39056F1-9CD5-478D-8945-14ACA7FBDCDD 2014-06-30
> query <- "select top 10 LoginID, JobTitle from HumanResources.Employee where HireDate > '2010-01-01'"
> sqlQuery(conn,query)
LoginID JobTitle
1 adventure-works\\ovidiu0 Senior Tool Designer
2 adventure-works\\janice0 Tool Designer
3 adventure-works\\michael8 Senior Design Engineer
4 adventure-works\\sharon0 Design Engineer
5 adventure-works\\john5 Marketing Specialist
6 adventure-works\\mary2 Marketing Assistant
7 adventure-works\\wanida0 Marketing Assistant
8 adventure-works\\kim1 Production Technician - WC60
9 adventure-works\\ed0 Production Technician - WC60
10 adventure-works\\maciej0 Production Technician - WC60
> query <- "select top 10 * from HumanResources.Employee where HireDate > '2010-01-01'"
> df <- sqlQuery(conn,query)[c("LoginID", "JobTitle")]
> df
LoginID JobTitle
1 adventure-works\\ovidiu0 Senior Tool Designer
2 adventure-works\\janice0 Tool Designer
3 adventure-works\\michael8 Senior Design Engineer
4 adventure-works\\sharon0 Design Engineer
5 adventure-works\\john5 Marketing Specialist
6 adventure-works\\mary2 Marketing Assistant
7 adventure-works\\wanida0 Marketing Assistant
8 adventure-works\\kim1 Production Technician - WC60
9 adventure-works\\ed0 Production Technician - WC60
10 adventure-works\\maciej0 Production Technician - WC60
> dim(df)
[1] 10 2
> > sapply(df,class)
LoginID JobTitle
"factor" "factor"
> sqlColumns(conn, "HumanResources.Employee")[c("COLUMN_NAME","TYPE_NAME")]
COLUMN_NAME TYPE_NAME
1 BusinessEntityID int
2 NationalIDNumber nvarchar
3 LoginID nvarchar
4 OrganizationNode hierarchyid
5 OrganizationLevel smallint
6 JobTitle nvarchar
7 BirthDate date
8 MaritalStatus nchar
9 Gender nchar
10 HireDate date
11 SalariedFlag Flag
12 VacationHours smallint
13 SickLeaveHours smallint
14 CurrentFlag Flag
15 rowguid uniqueidentifier
16 ModifiedDate datetime
> df <- sqlQuery(conn, "select ProductID, avg(UnitPrice),stdev(UnitPrice) from [Sales].[SalesOrderDetail] group by ProductID")
> colnames(df) <- c("ProductID", "Avg(UnitPrice)", "STDEV(UnitPrice)")
> colnames(df)
[1] "ProductID" "Avg(UnitPrice)" "STDEV(UnitPrice)"
> names(df)
[1] "ProductID" "Avg(UnitPrice)" "STDEV(UnitPrice)"
> head(df)
ProductID Avg(UnitPrice) STDEV(UnitPrice)
1 925 149.8519 3.315829e-01
2 902 200.0520 0.000000e+00
3 710 5.7000 2.299513e-07
4 879 159.0000 0.000000e+00
5 733 356.8980 1.677983e-05
6 856 53.9073 8.234393e-01
> head(df[1:2],3)
ProductID Avg(UnitPrice)
1 925 149.8519
2 902 200.0520
3 710 5.7000
> dim(df);ncol(df);nrow(df)
[1] 266 3
[1] 3
[1] 266
# str –> structure, not string
> str(df)
'data.frame': 266 obs. of 3 variables:
$ ProductID : int 925 902 710 879 733 856 756 779 802 971 ...
$ Avg(UnitPrice) : num 149.9 200.1 5.7 159 356.9 ...
$ STDEV(UnitPrice): num 3.32e-01 0.00 2.30e-07 0.00 1.68e-05 ...
> df[df$`Avg(UnitPrice)`>3000,c("ProductID","Avg(UnitPrice)")]
ProductID Avg(UnitPrice)
23 750 3270.419
47 753 3035.880
130 751 3326.304
158 752 3290.494
188 749 3170.195
> df[df$`Avg(UnitPrice)`>3000,]
ProductID Avg(UnitPrice) STDEV(UnitPrice)
23 750 3270.419 588.9196
47 753 3035.880 695.0954
130 751 3326.304 545.7862
158 752 3290.494 574.4163
188 749 3170.195 646.8741
> subset(df,ProductID>750 & `Avg(UnitPrice)`>3000,select=-`STDEV(UnitPrice)`)
ProductID Avg(UnitPrice)
47 753 3035.880
130 751 3326.304
158 752 3290.494
> df2 <- sqlQuery(conn,"select ProductID,UnitPrice from Sales.SalesOrderDetail")
> summary(df2)
ProductID UnitPrice
Min. :707.0 Min. : 1.328
1st Qu.:768.0 1st Qu.: 21.490
Median :863.0 Median : 49.990
Mean :841.7 Mean : 465.093
3rd Qu.:921.0 3rd Qu.: 602.346
Max. :999.0 Max. :3578.270
> summary(df2$UnitPrice)
Min. 1st Qu. Median Mean 3rd Qu. Max.
1.328 21.490 49.990 465.100 602.300 3578.000
> df_emp <- sqlQuery(conn,"select JobTitle,BirthDate from HumanResources.Employee")
> sqlSave(conn,df_emp,tablename="r_temp1",rownames=FALSE,fast=TRUE)
> sqlDrop(conn,"r_temp1")
> version _ platform x86_64-w64-mingw32 arch x86_64 os mingw32 system x86_64, mingw32 status major 3 minor 3.3 year 2017 month 03 day 06 svn rev 72310 language R version.string R version 3.3.3 (2017-03-06) nickname Another Canoe
> library(help="RODBC")
Information on package ‘RODBC’
Description:
Package: RODBC
Version: 1.3-15
Revision: $Rev: 3476 $
Date: 2017-04-13
Authors@R: c(person("Brian", "Ripley", role = c("aut", "cre"), email = "ripley@stats.ox.ac.uk"),
person("Michael", "Lapsley", role = "aut", comment = "1999 to Oct 2002"))
Title: ODBC Database Access
Description: An ODBC database interface.
SystemRequirements: An ODBC3 driver manager and drivers.
Depends: R (>= 3.0.0)
Imports: stats
LazyLoad: yes
Biarch: yes
License: GPL-2 | GPL-3
NeedsCompilation: yes
Packaged: 2017-04-13 07:00:50 UTC; ripley
Author: Brian Ripley [aut, cre], Michael Lapsley [aut] (1999 to Oct 2002)
Maintainer: Brian Ripley <ripley@stats.ox.ac.uk>
Repository: CRAN
Date/Publication: 2017-04-13 07:04:28 UTC
Built: R 3.3.2; x86_64-w64-mingw32; 2017-04-28 16:33:43 UTC; windows
Index:
RODBC ODBC Database Connectivity
odbcClose ODBC Close Connections
odbcConnect ODBC Open Connections
odbcDataSources List ODBC Data Sources
odbcGetInfo Request Information on an ODBC Connection
odbcQuery Low-level ODBC functions
odbcSetAutoCommit ODBC Set Auto-Commit Mode
setSqlTypeInfo Specify or Query a Mapping of R Types to DBMS
Types
sqlColumns Query Column Structure in ODBC Tables
sqlCopy ODBC Copy
sqlDrop Deletion Operations on Tables in ODBC databases
sqlFetch Reading Tables from ODBC Databases
sqlQuery Query an ODBC Database
sqlSave Write a Data Frame to a Table in an ODBC
Database
sqlTables List Tables on an ODBC Connection
sqlTypeInfo Request Information about Data Types in an ODBC
Database
Further information is available in the following vignettes in directory ‘C:/Program Files/Microsoft SQL
Server/140/R_SERVER/library/RODBC/doc’:
RODBC: ODBC Connectivity (source, pdf)
Wednesday, January 3, 2018
Using Open Source R-Studio Server connecting to Kerberos-enabled Hadoop
Step 1: Add line "SPARK_HOME=${SPARK_HOME-'/opt/cloudera/parcels/CDH/lib/spark/'}" to end of file "/usr/lib64/R/etc/Renviron"
Step 2: Connect to Spark using sparklyr inside R-Studio Server
> install.packages("sparklyr")
> library(sparklyr)
> readRenviron("/usr/lib64/R/etc/Renviron")
> sc <- spark_connect(master = "yarn-client",version = "1.6.0", config = list
(default = list(spark.yarn.keytab = "/home/donghua/donghua.keytab", spark.yarn.principal = "donghua@DBAGLOBE.COM")))
> sc $master [1] "yarn-client" $method [1] "shell" $app_name [1] "sparklyr" $config $config$default $config$default$spark.yarn.keytab [1] "/home/donghua/donghua.keytab" $config$default$spark.yarn.principal [1] "donghua@DBAGLOBE.COM" $spark_home [1] "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark" $backend A connection with description "->localhost:46015" class "sockconn" mode "wb" text "binary" opened "opened" can read "yes" can write "yes" $monitor A connection with description "->localhost:8880" class "sockconn" mode "rb" text "binary" opened "opened" can read "yes" can write "yes" $output_file [1] "/tmp/RtmpXWaXfE/file7af1ca61a03_spark.log" $spark_context <jobj[6]> class org.apache.spark.SparkContext org.apache.spark.SparkContext@355d7d99 $java_context <jobj[7]> class org.apache.spark.api.java.JavaSparkContext org.apache.spark.api.java.JavaSparkContext@ef616c5 attr(,"class") [1] "spark_connection" "spark_shell_connection" "DBIConnection"
> library(DBI) > iotdatademo <- dbGetQuery(sc, 'Select * from default.iotdatademo limit 10') > iotdatademo
Reference URL: https://medium.com/@bkvarda/sparklyr-r-interface-for-spark-and-kerberos-on-cloudera-80abf5f6b4ad
Common errors and fixes with Spark 1.6 running with Python3 (Anaconda Version)
1. Error caused by Python 3.6 (version too new)
[donghua@cdh-vm spark]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.6.3 |Anaconda, Inc.| (default, Oct 13 2017, 12:02:49)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
[TerminalIPythonApp] WARNING | Unknown error in handling PYTHONSTARTUP file /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/shell.py:
…
TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module'
How to fix:
[donghua@cdh-vm spark]$ conda create -n py35 python=3.5 anaconda
[donghua@cdh-vm spark]$ source activate py35
2. Error caused by worker using different version comparing to pyspark
(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.
In [9]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:> (0 + 2) / 2]
18/01/03 08:22:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh-vm.dbaglobe.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 64, in main
("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions
How to fix:
Add line “PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “
3. Error “Randomness of hash of string should be disabled via PYTHONHASHSEED”
In [1]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:> (0 + 2) / 2]18/01/03 09:17:09 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, cdh-vm.dbaglobe.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 111, in main
process()
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 1703, in add_shuffle_key
buckets[partitionFunc(k) % numPartitions].append((k, v))
File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 74, in portable_hash
raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
How to fix:
Add line “SPARK_YARN_USER_ENV=PYTHONHASHSEED=0” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “
[root@cdh-vm conf]# diff /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh.orig
63,66d62
<
< PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3
< SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
<
79d74
<
(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.
In [3]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey().collect()
Out[3]:
[('0', 80.42217204861151),
('1', 80.42420773058639),
('2', 80.516892013888),
('3', 80.42997673611161),
('4', 80.62740798611237),
('5', 80.49621712962933),
('6', 80.5453983217595)]