Wednesday, January 3, 2018

Common errors and fixes with Spark 1.6 running with Python3 (Anaconda Version)

1. Error caused by Python 3.6 (version too new)

[donghua@cdh-vm spark]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.6.3 |Anaconda, Inc.| (default, Oct 13 2017, 12:02:49)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
[TerminalIPythonApp] WARNING | Unknown error in handling PYTHONSTARTUP file /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/shell.py:


TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module'

How to fix:

[donghua@cdh-vm spark]$ conda create -n py35 python=3.5 anaconda

[donghua@cdh-vm spark]$ source activate py35

2. Error caused by worker using different version comparing to pyspark

(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.


In [9]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
   ...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:>                                                          (0 + 2) / 2]

18/01/03 08:22:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh-vm.dbaglobe.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 64, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions

How to fix:

Add line “PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “

3. Error “Randomness of hash of string should be disabled via PYTHONHASHSEED”

In [1]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
   ...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:>                                                          (0 + 2) / 2]18/01/03 09:17:09 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, cdh-vm.dbaglobe.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 1703, in add_shuffle_key
    buckets[partitionFunc(k) % numPartitions].append((k, v))
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 74, in portable_hash
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

How to fix:

Add line “SPARK_YARN_USER_ENV=PYTHONHASHSEED=0” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “

[root@cdh-vm conf]# diff /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh.orig
63,66d62
<
< PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3
< SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
<
79d74
<

(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.


In [3]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey().collect()

Out[3]:
[('0', 80.42217204861151),
  ('1', 80.42420773058639),
  ('2', 80.516892013888),
  ('3', 80.42997673611161),
  ('4', 80.62740798611237),

  ('5', 80.49621712962933),
  ('6', 80.5453983217595)]