Saturday, December 2, 2017

Pyspark Example: Calculate average value in Secure Hadoop Cluster

Expected output based on Hive SQL result:

image

image

Spark application example:

Code: https://github.com/luodonghua/bigdata/blob/master/speedByDay.py

import sys

from pyspark import SparkContext

if __name__ == "__main__":

if len(sys.argv) != 2:

print("Usage: speedByDay <file>")

exit(-1)

sc = SparkContext()

# sc.setLogLevel("ERROR")

sortedAvg = sc.textFile(sys.argv[1]) \

.filter(lambda line: line[0:9] <> "StationID") \

.map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))) \

.reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1])) \

.mapValues(lambda v: v[0]/v[1]) \

.sortByKey()

output = sortedAvg.collect()

for (day, speed) in output:

print(day+': '+str(speed))

sortedAvg.saveAsTextFile("/user/donghua/speedByDay")

sc.stop()

[donghua@cdh-vm ~]$ kinit -t donghua.keytab donghua@DBAGLOBE.COM
keytab specified, forcing -k
[donghua@cdh-vm ~]$ klist
Ticket cache: FILE:/tmp/krb5cc_1000
Default principal: donghua@DBAGLOBE.COM

Valid starting       Expires              Service principal
12/01/2017 20:35:05  12/02/2017 20:35:05  krbtgt/DBAGLOBE.COM@DBAGLOBE.COM
[donghua@cdh-vm ~]$ spark-submit --master yarn-master speedByDay.py /user/donghua/IOTDataDemo.csv
17/12/01 20:35:24 INFO spark.SparkContext: Running Spark version 1.6.0
17/12/01 20:35:24 INFO spark.SecurityManager: Changing view acls to: donghua
17/12/01 20:35:24 INFO spark.SecurityManager: Changing modify acls to: donghua
17/12/01 20:35:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(donghua); users with modify permissions: Set(donghua)
17/12/01 20:35:25 INFO util.Utils: Successfully started service 'sparkDriver' on port 42417.
17/12/01 20:35:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/12/01 20:35:25 INFO Remoting: Starting remoting
17/12/01 20:35:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.10:46767]
17/12/01 20:35:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriverActorSystem@192.168.56.10:46767]
17/12/01 20:35:25 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 46767.
17/12/01 20:35:25 INFO spark.SparkEnv: Registering MapOutputTracker
17/12/01 20:35:25 INFO spark.SparkEnv: Registering BlockManagerMaster
17/12/01 20:35:25 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-473c558d-e9ae-48d3-83b7-b030d958244f
17/12/01 20:35:25 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
17/12/01 20:35:25 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/12/01 20:35:25 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/12/01 20:35:26 INFO ui.SparkUI: Started SparkUI at
http://192.168.56.10:4040
17/12/01 20:35:26 INFO client.RMProxy: Connecting to ResourceManager at cdh-vm.dbaglobe.com/192.168.56.10:8032
17/12/01 20:35:26 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
17/12/01 20:35:26 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (1536 MB per container)
17/12/01 20:35:26 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
17/12/01 20:35:26 INFO yarn.Client: Setting up container launch context for our AM
17/12/01 20:35:26 INFO yarn.Client: Setting up the launch environment for our AM container
17/12/01 20:35:26 INFO yarn.Client: Preparing resources for our AM container
17/12/01 20:35:27 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: hdfs://cdh-vm.dbaglobe.com:8020/user/donghua/.sparkStaging/application_1512169991692_0016
17/12/01 20:35:27 INFO hdfs.DFSClient: Created token for donghua: HDFS_DELEGATION_TOKEN owner=donghua@DBAGLOBE.COM, renewer=yarn, realUser=, issueDate=1512178527531, maxDate=1512783327531, sequenceNumber=76, masterKeyId=6 on 192.168.56.10:8020
17/12/01 20:35:28 INFO hive.metastore: Trying to connect to metastore with URI thrift://cdh-vm.dbaglobe.com:9083
17/12/01 20:35:28 INFO hive.metastore: Opened a connection to metastore, current connections: 1
17/12/01 20:35:28 INFO hive.metastore: Connected to metastore.
17/12/01 20:35:28 INFO hive.metastore: Closed a connection to metastore, current connections: 0
17/12/01 20:35:29 INFO yarn.Client: Uploading resource file:/tmp/spark-5cdca44e-93b7-4387-a128-7f6718d66e23/__spark_conf__3916556689308061352.zip -> hdfs://cdh-vm.dbaglobe.com:8020/user/donghua/.sparkStaging/application_1512169991692_0016/__spark_conf__3916556689308061352.zip
17/12/01 20:35:29 INFO spark.SecurityManager: Changing view acls to: donghua
17/12/01 20:35:29 INFO spark.SecurityManager: Changing modify acls to: donghua
17/12/01 20:35:29 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(donghua); users with modify permissions: Set(donghua)
17/12/01 20:35:29 INFO yarn.Client: Submitting application 16 to ResourceManager
17/12/01 20:35:29 INFO impl.YarnClientImpl: Submitted application application_1512169991692_0016
17/12/01 20:35:30 INFO yarn.Client: Application report for application_1512169991692_0016 (state: ACCEPTED)
17/12/01 20:35:30 INFO yarn.Client:
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.users.donghua
          start time: 1512178529510
         final status: UNDEFINED
         tracking URL:
http://cdh-vm.dbaglobe.com:8088/proxy/application_1512169991692_0016/
         user: donghua
17/12/01 20:35:31 INFO yarn.Client: Application report for application_1512169991692_0016 (state: ACCEPTED)
17/12/01 20:35:32 INFO yarn.Client: Application report for application_1512169991692_0016 (state: ACCEPTED)
17/12/01 20:35:33 INFO yarn.Client: Application report for application_1512169991692_0016 (state: ACCEPTED)
17/12/01 20:35:34 INFO yarn.Client: Application report for application_1512169991692_0016 (state: ACCEPTED)
17/12/01 20:35:35 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
17/12/01 20:35:35 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> cdh-vm.dbaglobe.com, PROXY_URI_BASES ->
http://cdh-vm.dbaglobe.com:8088/proxy/application_1512169991692_0016), /proxy/application_1512169991692_0016
17/12/01 20:35:35 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
17/12/01 20:35:35 INFO yarn.Client: Application report for application_1512169991692_0016 (state: RUNNING)
17/12/01 20:35:35 INFO yarn.Client:
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: N/A
          ApplicationMaster host: 192.168.56.10
         ApplicationMaster RPC port: 0
         queue: root.users.donghua
         start time: 1512178529510
         final status: UNDEFINED
         tracking URL:
http://cdh-vm.dbaglobe.com:8088/proxy/application_1512169991692_0016/
         user: donghua
17/12/01 20:35:35 INFO cluster.YarnClientSchedulerBackend: Application application_1512169991692_0016 has started running.
17/12/01 20:35:35 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36740.
17/12/01 20:35:35 INFO netty.NettyBlockTransferService: Server created on 36740
17/12/01 20:35:35 INFO storage.BlockManager: external shuffle service port = 7337
17/12/01 20:35:35 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/12/01 20:35:35 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.56.10:36740 with 530.0 MB RAM, BlockManagerId(driver, 192.168.56.10, 36740)
17/12/01 20:35:35 INFO storage.BlockManagerMaster: Registered BlockManager
17/12/01 20:35:36 INFO scheduler.EventLoggingListener: Logging events to hdfs://cdh-vm.dbaglobe.com:8020/user/spark/applicationHistory/application_1512169991692_0016
17/12/01 20:35:36 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.ClouderaNavigatorListener
17/12/01 20:35:36 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
17/12/01 20:35:36 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 298.4 KB, free 529.7 MB)
17/12/01 20:35:36 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.8 KB, free 529.7 MB)
17/12/01 20:35:36 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.10:36740 (size: 25.8 KB, free: 530.0 MB)
17/12/01 20:35:36 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
17/12/01 20:35:36 INFO hdfs.DFSClient: Created token for donghua: HDFS_DELEGATION_TOKEN owner=donghua@DBAGLOBE.COM, renewer=yarn, realUser=, issueDate=1512178536774, maxDate=1512783336774, sequenceNumber=77, masterKeyId=6 on 192.168.56.10:8020
17/12/01 20:35:36 INFO security.TokenCache: Got dt for hdfs://cdh-vm.dbaglobe.com:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.56.10:8020, Ident: (token for donghua: HDFS_DELEGATION_TOKEN owner=donghua@DBAGLOBE.COM, renewer=yarn, realUser=, issueDate=1512178536774, maxDate=1512783336774, sequenceNumber=77, masterKeyId=6)
17/12/01 20:35:36 INFO mapred.FileInputFormat: Total input paths to process : 1
17/12/01 20:35:36 INFO spark.SparkContext: Starting job: sortByKey at /home/donghua/speedByDay.py:18
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Registering RDD 3 (reduceByKey at /home/donghua/speedByDay.py:17)
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Got job 0 (sortByKey at /home/donghua/speedByDay.py:18) with 2 output partitions
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (sortByKey at /home/donghua/speedByDay.py:18)
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at /home/donghua/speedByDay.py:17), which has no missing parents
17/12/01 20:35:36 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.6 KB, free 529.7 MB)
17/12/01 20:35:36 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.3 KB, free 529.7 MB)
17/12/01 20:35:36 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.56.10:36740 (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:36 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at /home/donghua/speedByDay.py:17) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:36 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks
17/12/01 20:35:38 INFO spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
17/12/01 20:35:39 INFO spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2)
17/12/01 20:35:43 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (cdh-vm.dbaglobe.com:43738) with ID 1
17/12/01 20:35:43 INFO spark.ExecutorAllocationManager: New executor 1 has registered (new total is 1)
17/12/01 20:35:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, cdh-vm.dbaglobe.com, executor 1, partition 0, NODE_LOCAL, 2151 bytes)
17/12/01 20:35:43 INFO storage.BlockManagerMasterEndpoint: Registering block manager cdh-vm.dbaglobe.com:33341 with 530.0 MB RAM, BlockManagerId(1, cdh-vm.dbaglobe.com, 33341)
17/12/01 20:35:43 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:44 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (cdh-vm.dbaglobe.com:43750) with ID 2
17/12/01 20:35:44 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, cdh-vm.dbaglobe.com, executor 2, partition 1, NODE_LOCAL, 2151 bytes)
17/12/01 20:35:44 INFO spark.ExecutorAllocationManager: New executor 2 has registered (new total is 2)
17/12/01 20:35:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 25.8 KB, free: 530.0 MB)
17/12/01 20:35:44 INFO storage.BlockManagerMasterEndpoint: Registering block manager cdh-vm.dbaglobe.com:40141 with 530.0 MB RAM, BlockManagerId(2, cdh-vm.dbaglobe.com, 40141)
17/12/01 20:35:44 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:45 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 25.8 KB, free: 530.0 MB)
17/12/01 20:35:50 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 7096 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 6762 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:51 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/12/01 20:35:51 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (reduceByKey at /home/donghua/speedByDay.py:17) finished in 14.066 s
17/12/01 20:35:51 INFO scheduler.DAGScheduler: looking for newly runnable stages
17/12/01 20:35:51 INFO scheduler.DAGScheduler: running: Set()
17/12/01 20:35:51 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: failed: Set()
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[6] at sortByKey at /home/donghua/speedByDay.py:18), which has no missing parents
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.0 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.2 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.56.10:36740 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (PythonRDD[6] at sortByKey at /home/donghua/speedByDay.py:18) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:51 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, cdh-vm.dbaglobe.com, executor 2, partition 0, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, cdh-vm.dbaglobe.com, executor 1, partition 1, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to cdh-vm.dbaglobe.com:43738
17/12/01 20:35:51 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 161 bytes
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.56.10:36740 in memory (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on cdh-vm.dbaglobe.com:40141 in memory (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to cdh-vm.dbaglobe.com:43750
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on cdh-vm.dbaglobe.com:33341 in memory (size: 5.3 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 266 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 288 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:51 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/12/01 20:35:51 INFO scheduler.DAGScheduler: ResultStage 1 (sortByKey at /home/donghua/speedByDay.py:18) finished in 0.290 s
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Job 0 finished: sortByKey at /home/donghua/speedByDay.py:18, took 14.527229 s
17/12/01 20:35:51 INFO spark.SparkContext: Starting job: sortByKey at /home/donghua/speedByDay.py:18
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Got job 1 (sortByKey at /home/donghua/speedByDay.py:18) with 2 output partitions
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (sortByKey at /home/donghua/speedByDay.py:18)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Missing parents: List()
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (PythonRDD[7] at sortByKey at /home/donghua/speedByDay.py:18), which has no missing parents
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.9 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.2 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.56.10:36740 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 3 (PythonRDD[7] at sortByKey at /home/donghua/speedByDay.py:18) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:51 INFO cluster.YarnScheduler: Adding task set 3.0 with 2 tasks
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 4, cdh-vm.dbaglobe.com, executor 1, partition 0, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 5, cdh-vm.dbaglobe.com, executor 2, partition 1, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 4.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 4) in 120 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 5) in 130 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:51 INFO cluster.YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/12/01 20:35:51 INFO scheduler.DAGScheduler: ResultStage 3 (sortByKey at /home/donghua/speedByDay.py:18) finished in 0.131 s
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Job 1 finished: sortByKey at /home/donghua/speedByDay.py:18, took 0.149879 s
17/12/01 20:35:51 INFO spark.SparkContext: Starting job: collect at /home/donghua/speedByDay.py:21
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Registering RDD 9 (sortByKey at /home/donghua/speedByDay.py:18)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Got job 2 (collect at /home/donghua/speedByDay.py:21) with 2 output partitions
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (collect at /home/donghua/speedByDay.py:21)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 5)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 5 (PairwiseRDD[9] at sortByKey at /home/donghua/speedByDay.py:18), which has no missing parents
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 7.7 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.8 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.56.10:36740 (size: 4.8 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 5 (PairwiseRDD[9] at sortByKey at /home/donghua/speedByDay.py:18) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:51 INFO cluster.YarnScheduler: Adding task set 5.0 with 2 tasks
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 6, cdh-vm.dbaglobe.com, executor 2, partition 0, NODE_LOCAL, 1883 bytes)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 5.0 (TID 7, cdh-vm.dbaglobe.com, executor 1, partition 1, NODE_LOCAL, 1883 bytes)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 4.8 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 4.8 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 5.0 (TID 7) in 122 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 6) in 136 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:51 INFO cluster.YarnScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool
17/12/01 20:35:51 INFO scheduler.DAGScheduler: ShuffleMapStage 5 (sortByKey at /home/donghua/speedByDay.py:18) finished in 0.137 s
17/12/01 20:35:51 INFO scheduler.DAGScheduler: looking for newly runnable stages
17/12/01 20:35:51 INFO scheduler.DAGScheduler: running: Set()
17/12/01 20:35:51 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 6)
17/12/01 20:35:51 INFO scheduler.DAGScheduler: failed: Set()
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at collect at /home/donghua/speedByDay.py:21), which has no missing parents
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 5.2 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 3.2 KB, free 529.7 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.56.10:36740 (size: 3.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 6 (PythonRDD[12] at collect at /home/donghua/speedByDay.py:21) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:51 INFO cluster.YarnScheduler: Adding task set 6.0 with 2 tasks
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 8, cdh-vm.dbaglobe.com, executor 1, partition 0, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 6.0 (TID 9, cdh-vm.dbaglobe.com, executor 2, partition 1, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 3.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 3.2 KB, free: 530.0 MB)
17/12/01 20:35:51 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to cdh-vm.dbaglobe.com:43738
17/12/01 20:35:51 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 163 bytes
17/12/01 20:35:51 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to cdh-vm.dbaglobe.com:43750
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 103 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:51 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 6.0 (TID 9) in 117 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:51 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
17/12/01 20:35:51 INFO scheduler.DAGScheduler: ResultStage 6 (collect at /home/donghua/speedByDay.py:21) finished in 0.119 s
17/12/01 20:35:51 INFO scheduler.DAGScheduler: Job 2 finished: collect at /home/donghua/speedByDay.py:21, took 0.290281 s
0: 80.4221720486
1: 80.4242077306
2: 80.5168920139
3: 80.4299767361
4: 80.6274079861
5: 80.4962171296
6: 80.5453983218

17/12/01 20:35:52 INFO hdfs.DFSClient: Created token for donghua: HDFS_DELEGATION_TOKEN owner=donghua@DBAGLOBE.COM, renewer=yarn, realUser=, issueDate=1512178551995, maxDate=1512783351995, sequenceNumber=78, masterKeyId=6 on 192.168.56.10:8020
17/12/01 20:35:52 INFO security.TokenCache: Got dt for hdfs://cdh-vm.dbaglobe.com:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.56.10:8020, Ident: (token for donghua: HDFS_DELEGATION_TOKEN owner=donghua@DBAGLOBE.COM, renewer=yarn, realUser=, issueDate=1512178551995, maxDate=1512783351995, sequenceNumber=78, masterKeyId=6)
17/12/01 20:35:52 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/12/01 20:35:52 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/12/01 20:35:52 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/12/01 20:35:52 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/12/01 20:35:52 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/12/01 20:35:52 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
17/12/01 20:35:52 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
17/12/01 20:35:52 INFO spark.SparkContext: Starting job: saveAsTextFile at NativeMethodAccessorImpl.java:-2
17/12/01 20:35:52 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 161 bytes
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Got job 3 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) with 2 output partitions
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Final stage: ResultStage 9 (saveAsTextFile at NativeMethodAccessorImpl.java:-2)
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 8)
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Missing parents: List()
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Submitting ResultStage 9 (MapPartitionsRDD[15] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which has no missing parents
17/12/01 20:35:52 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 83.2 KB, free 529.6 MB)
17/12/01 20:35:52 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 30.9 KB, free 529.5 MB)
17/12/01 20:35:52 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.56.10:36740 (size: 30.9 KB, free: 529.9 MB)
17/12/01 20:35:52 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1004
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 9 (MapPartitionsRDD[15] at saveAsTextFile at NativeMethodAccessorImpl.java:-2) (first 15 tasks are for partitions Vector(0, 1))
17/12/01 20:35:52 INFO cluster.YarnScheduler: Adding task set 9.0 with 2 tasks
17/12/01 20:35:52 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 9.0 (TID 10, cdh-vm.dbaglobe.com, executor 1, partition 0, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:52 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 9.0 (TID 11, cdh-vm.dbaglobe.com, executor 2, partition 1, NODE_LOCAL, 1894 bytes)
17/12/01 20:35:52 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on cdh-vm.dbaglobe.com:33341 (size: 30.9 KB, free: 529.9 MB)
17/12/01 20:35:52 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on cdh-vm.dbaglobe.com:40141 (size: 30.9 KB, free: 529.9 MB)
17/12/01 20:35:52 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 9.0 (TID 10) in 445 ms on cdh-vm.dbaglobe.com (executor 1) (1/2)
17/12/01 20:35:52 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 9.0 (TID 11) in 487 ms on cdh-vm.dbaglobe.com (executor 2) (2/2)
17/12/01 20:35:52 INFO cluster.YarnScheduler: Removed TaskSet 9.0, whose tasks have all completed, from pool
17/12/01 20:35:52 INFO scheduler.DAGScheduler: ResultStage 9 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) finished in 0.489 s
17/12/01 20:35:52 INFO scheduler.DAGScheduler: Job 3 finished: saveAsTextFile at NativeMethodAccessorImpl.java:-2, took 0.518110 s
17/12/01 20:35:52 INFO ui.SparkUI: Stopped Spark web UI at
http://192.168.56.10:4040
17/12/01 20:35:52 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
17/12/01 20:35:52 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
17/12/01 20:35:52 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
17/12/01 20:35:52 INFO cluster.YarnClientSchedulerBackend: Stopped
17/12/01 20:35:52 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/12/01 20:35:52 INFO storage.MemoryStore: MemoryStore cleared
17/12/01 20:35:52 INFO storage.BlockManager: BlockManager stopped
17/12/01 20:35:52 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
17/12/01 20:35:52 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/12/01 20:35:52 INFO spark.SparkContext: Successfully stopped SparkContext
17/12/01 20:35:52 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
17/12/01 20:35:52 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/12/01 20:35:52 INFO Remoting: Remoting shut down
17/12/01 20:35:52 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
17/12/01 20:35:53 INFO util.ShutdownHookManager: Shutdown hook called
17/12/01 20:35:53 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-5cdca44e-93b7-4387-a128-7f6718d66e23/pyspark-f9370301-fbd9-41d1-a904-b8b3fc792a81
17/12/01 20:35:53 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-5cdca44e-93b7-4387-a128-7f6718d66e23

[donghua@cdh-vm ~]$ hdfs dfs -ls /user/donghua/speedByDay
Found 3 items
-rw-r--r--   1 donghua donghua          0 2017-12-01 20:35 /user/donghua/speedByDay/_SUCCESS
-rw-r--r--   1 donghua donghua        102 2017-12-01 20:35 /user/donghua/speedByDay/part-00000
-rw-r--r--   1 donghua donghua         77 2017-12-01 20:35 /user/donghua/speedByDay/part-00001
[donghua@cdh-vm ~]$ hdfs dfs -cat /user/donghua/speedByDay/part-*
(u'0', 80.42217204861151)
(u'1', 80.42420773058639)
(u'2', 80.516892013888)
(u'3', 80.42997673611161)
(u'4', 80.62740798611237)
(u'5', 80.49621712962933)
(u'6', 80.5453983217595)

Remark:the job will stuck if kerberos authentication not successful. Sample error as below

[donghua@cdh-vm ~]$ klist
klist: No credentials cache found (filename: /tmp/krb5cc_1000)
[donghua@cdh-vm ~]$
[donghua@cdh-vm ~]$
[donghua@cdh-vm ~]$ spark-submit --master yarn-master speedByDay.py /user/donghua/IOTDataDemo.csv
17/12/01 20:23:57 INFO spark.SparkContext: Running Spark version 1.6.0
17/12/01 20:23:58 INFO spark.SecurityManager: Changing view acls to: donghua
17/12/01 20:23:58 INFO spark.SecurityManager: Changing modify acls to: donghua
17/12/01 20:23:58 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(donghua); users with modify permissions: Set(donghua)
17/12/01 20:23:58 INFO util.Utils: Successfully started service 'sparkDriver' on port 43975.
17/12/01 20:23:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/12/01 20:23:58 INFO Remoting: Starting remoting
17/12/01 20:23:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.10:45779]
17/12/01 20:23:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriverActorSystem@192.168.56.10:45779]
17/12/01 20:23:58 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 45779.
17/12/01 20:23:59 INFO spark.SparkEnv: Registering MapOutputTracker
17/12/01 20:23:59 INFO spark.SparkEnv: Registering BlockManagerMaster
17/12/01 20:23:59 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-42be4afd-cb9d-4f2b-ad65-cee0ffd298c3
17/12/01 20:23:59 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
17/12/01 20:23:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/12/01 20:23:59 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/12/01 20:23:59 INFO ui.SparkUI: Started SparkUI at
http://192.168.56.10:4040
17/12/01 20:23:59 INFO client.RMProxy: Connecting to ResourceManager at cdh-vm.dbaglobe.com/192.168.56.10:8032
17/12/01 20:24:00 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:24:00 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]

17/12/01 20:24:00 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]

17/12/01 20:24:30 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:24:30 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:24:30 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:25:00 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:25:00 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:25:00 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:25:30 WARN security.UserGroupInformation: PriviledgedActionException as:donghua (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
17/12/01 20:25:30 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]