Wednesday, October 12, 2016

Integrate Apache Spark with Apache Cassandra using RDD


Reference URL: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
 
[donghua@vmxdb01 ~]$ tar zxvf Downloads/spark-2.0.1-bin-hadoop2.7.tgz
[donghua@vmxdb01 ~]$ tar zxvf Downloads/scala-2.11.8.tgz
[donghua@vmxdb01 ~]$ export SPARK_HOME=/home/donghua/spark-2.0.1-bin-hadoop2.7
[donghua@vmxdb01 ~]$ export SCALA_HOME=/home/donghua/scala-2.11.8
[donghua@vmxdb01 ~]$ export export PATH=$SCALA_HOME/bin:$PATH

[donghua@vmxdb01 ~]$ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.9 | CQL spec 3.4.2 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE TABLE mykeyspace.kv(key text PRIMARY KEY, value int);
cqlsh> INSERT INTO mykeyspace.kv(key, value) VALUES ('key1', 1);
cqlsh> INSERT INTO mykeyspace.kv(key, value) VALUES ('key2', 2);
cqlsh> select * from mykeyspace.kv;

 key  | value
------+-------
 key1 |     1
 key2 |     2



[donghua@vmxdb01 ~]$ $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11 --conf spark.cassandra.connection.host=127.0.0.1
Ivy Default Cache set to: /home/donghua/.ivy2/cache
The jars for the packages stored in: /home/donghua/.ivy2/jars
:: loading settings :: url = jar:file:/home/donghua/spark-2.0.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found datastax#spark-cassandra-connector;2.0.0-M2-s_2.11 in spark-packages
    found commons-beanutils#commons-beanutils;1.8.0 in central
    found org.joda#joda-convert;1.2 in central
    found joda-time#joda-time;2.3 in central
    found io.netty#netty-all;4.0.33.Final in central
    found com.twitter#jsr166e;1.1.0 in central
    found org.scala-lang#scala-reflect;2.11.8 in central
downloading http://dl.bintray.com/spark-packages/maven/datastax/spark-cassandra-connector/2.0.0-M2-s_2.11/spark-cassandra-connector-2.0.0-M2-s_2.11.jar ...
    [SUCCESSFUL ] datastax#spark-cassandra-connector;2.0.0-M2-s_2.11!spark-cassandra-connector.jar (2275ms)
downloading https://repo1.maven.org/maven2/commons-beanutils/commons-beanutils/1.8.0/commons-beanutils-1.8.0.jar ...
    [SUCCESSFUL ] commons-beanutils#commons-beanutils;1.8.0!commons-beanutils.jar (599ms)
downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/joda-convert-1.2.jar ...
    [SUCCESSFUL ] org.joda#joda-convert;1.2!joda-convert.jar (204ms)
downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.3/joda-time-2.3.jar ...
    [SUCCESSFUL ] joda-time#joda-time;2.3!joda-time.jar (466ms)
downloading https://repo1.maven.org/maven2/io/netty/netty-all/4.0.33.Final/netty-all-4.0.33.Final.jar ...
    [SUCCESSFUL ] io.netty#netty-all;4.0.33.Final!netty-all.jar (597ms)
downloading https://repo1.maven.org/maven2/com/twitter/jsr166e/1.1.0/jsr166e-1.1.0.jar ...
    [SUCCESSFUL ] com.twitter#jsr166e;1.1.0!jsr166e.jar (211ms)
downloading https://repo1.maven.org/maven2/org/scala-lang/scala-reflect/2.11.8/scala-reflect-2.11.8.jar ...
    [SUCCESSFUL ] org.scala-lang#scala-reflect;2.11.8!scala-reflect.jar (1151ms)
:: resolution report :: resolve 14983ms :: artifacts dl 5509ms
    :: modules in use:
    com.twitter#jsr166e;1.1.0 from central in [default]
    commons-beanutils#commons-beanutils;1.8.0 from central in [default]
    datastax#spark-cassandra-connector;2.0.0-M2-s_2.11 from spark-packages in [default]
    io.netty#netty-all;4.0.33.Final from central in [default]
    joda-time#joda-time;2.3 from central in [default]
    org.joda#joda-convert;1.2 from central in [default]
    org.scala-lang#scala-reflect;2.11.8 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   7   |   7   |   7   |   0   ||   7   |   7   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    7 artifacts copied, 0 already retrieved (13315kB/56ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/10/11 18:45:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/11 18:45:39 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.6.151:4040
Spark context available as 'sc' (master = local[*], app id = local-1476225939323).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.1
      /_/
        
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> val rdd = sc.cassandraTable("mykeyspace", "kv")
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18

scala> println(rdd.count)
2                                                                              

scala> println(rdd.first)
CassandraRow{key: key1, value: 1}                                              

scala> println(rdd.map(_.getInt("value")).sum)
3.0                                                                            

scala> val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at :27

scala> collection.saveToCassandra("mykeyspace", "kv", SomeColumns("key", "value"))
                                                                               
scala> println(rdd.count)
4                                                                              
                                                             
scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").collect()
res13: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{key: key1, value: 1}, CassandraRow{key: key4, value: 4}, CassandraRow{key: key3, value: 3}, CassandraRow{key: key2, value: 2})

scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").collect().foreach(println)
CassandraRow{key: key1, value: 1}
CassandraRow{key: key4, value: 4}
CassandraRow{key: key3, value: 3}
CassandraRow{key: key2, value: 2}

scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").where("value>?",2).collect().foreach(println)
CassandraRow{key: key4, value: 4}
CassandraRow{key: key3, value: 3}

scala> :quit

cqlsh> select * from mykeyspace.kv;

 key  | value
------+-------
 key1 |     1
 key4 |     4
 key3 |     3
 key2 |     2



Below error caused by incompatible Scala version and cassandra spark connector. (for example, datastax:spark-cassandra-connector:2.0.0-M2-s_2.10 is using Scala 2.10 instead of 2.11)

scala> println(rdd.count)
[Stage 0:>                                                          (0 + 0) / 6]16/10/11 10:48:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    at com.datastax.spark.connector.util.CountingIterator.(CountingIterator.scala:4)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:336)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[Stage 0:>                                                          (0 + 2) / 6]16/10/11 10:48:03 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class