Monday, November 7, 2016

SparkRDD MongoDB Example



donghua@ubuntu:~$ ./spark-1.6.1/bin/spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.10:0.1 --conf "spark.mongodb.input.uri=mongodb://127.0.01/nasa.eva" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/nasa.astronautTotals"
Ivy Default Cache set to: /home/donghua/.ivy2/cache
The jars for the packages stored in: /home/donghua/.ivy2/jars
:: loading settings :: url = jar:file:/home/donghua/spark-1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found org.mongodb.spark#mongo-spark-connector_2.10;0.1 in central
        found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 343ms :: artifacts dl 9ms
        :: modules in use:
        org.mongodb#mongo-java-driver;3.2.2 from central in [default]
        org.mongodb.spark#mongo-spark-connector_2.10;0.1 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/9ms)
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
16/11/07 21:21:53 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.6.152 instead (on interface ens33)
16/11/07 21:21:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
16/11/07 21:21:58 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 21:21:58 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 21:22:04 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/11/07 21:22:05 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/11/07 21:22:07 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 21:22:08 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

scala> import com.mongodb.spark._
import com.mongodb.spark._

scala> import com.mongodb.spark.rdd.MongoRDD
import com.mongodb.spark.rdd.MongoRDD

scala> import org.bson.Document
import org.bson.Document

scala> var rdd = sc.loadFromMongoDB()
rdd: com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[0] at RDD at MongoRDD.scala:160

scala> print(rdd.count())
375
scala> print(rdd.first())
Document{{_id=58207883126541824d4bcf58, EVA #=1, Country=USA, Crew=Ed White, Vehicle=Gemini IV, Date=06/03/1965, Duration=0:36, Purpose=First U.S. EVA. Used HHMU and took  photos.  Gas flow cooling of 25ft umbilical overwhelmed by vehicle ingress work and helmet fogged.  Lost overglove.  Jettisoned thermal gloves and helmet sun visor}}
scala> exit;
warning: there were 1 deprecation warning(s); re-run with -deprecation for details


# part 2, inside spark-shell
## Example to set new namespace without using command line parameters

scala> import com.mongodb.spark._
import com.mongodb.spark._

scala> import com.mongodb.spark.rdd.MongoRDD
import com.mongodb.spark.rdd.MongoRDD

scala> import org.bson.Document
import org.bson.Document

scala> import com.mongodb.spark.config._
import com.mongodb.spark.config._

scala> var readConf = ReadConfig(sc)
readConf: com.mongodb.spark.config.ReadConfig.Self = ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))

scala> var readConf2 = ReadConfig(Map("collection"->"eva"), Some(readConf))
readConf2: com.mongodb.spark.config.ReadConfig = ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))

scala> val newRDD = sc.loadFromMongoDB(readConfig=readConf2)
newRDD: com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[0] at RDD at MongoRDD.scala:160


scala> newRDD.first()
res1: org.bson.Document = Document{{_id=58207883126541824d4bcf58, EVA #=1, Country=USA, Crew=Ed White, Vehicle=Gemini IV, Date=06/03/1965, Duration=0:36, Purpose=First U.S. EVA. Used HHMU and took  photos.  Gas flow cooling of 25ft umbilical overwhelmed by vehicle ingress work and helmet fogged.  Lost overglove.  Jettisoned thermal gloves and helmet sun visor}}