Monday, May 7, 2018

Read CSV data into Spark (RDD and DataFrame comparatively)


# Sample data
[donghua@cdh-vm data]$ hdfs dfs -cat /data/salesmen.csv
Date,Salesman,Revenue
1/11/16,Bob,1053
1/12/16,Bob,4362
1/13/16,Bob,6812

Method 1: Using RDD directly

from pyspark.sql.types import *
from datetime import datetime


salesmanSchema = StructType(
[
StructField("Date",DateType()),
StructField("Salesman",StringType()),
StructField("Revenue",IntegerType())
])


salesmanRDD = sc.textFile('/data/salesmen.csv')

header = salesmanRDD.first()

dataRDD = salesmanRDD.filter(lambda line: line <> header)

salesmanSchemaRDD=dataRDD.map(lambda line: line.split(',')).\
map(lambda values:[datetime(2000+int(values[0].split('/')[2]),int(values[0].split('/')[0]),int(values[0].split('/')[1])),values[1],int(values[2])])

salesmanDF1=spark.createDataFrame(salesmanSchemaRDD,salesmanSchema)

salesmanDF1.show(5)


In [55]: salesmanDF1.show(5)
+----------+--------+-------+
|      Date|Salesman|Revenue|
+----------+--------+-------+
|2016-01-01|     Bob|   7172|
|2016-01-02|     Bob|   6362|
|2016-01-03|     Bob|   5982|
|2016-01-04|     Bob|   7917|
|2016-01-05|     Bob|   7837|
+----------+--------+-------+
only showing top 5 rows

Method 2: Using DataFrame with predefined scheme directly

from pyspark.sql.types import *

salesmanSchema = StructType(
[
StructField("Date",DateType()),
StructField("Salesman",StringType()),
StructField("Revenue",IntegerType())
])

In [59]: salesmanDF2 = spark.read.schema(salesmanSchema).csv('/data/salesmen.csv',header=True,dateFormat='MM/dd/yy')

In [60]: salesmanDF2.show(5)
+----------+--------+-------+
|      Date|Salesman|Revenue|
+----------+--------+-------+
|2016-01-01|     Bob|   7172|
|2016-01-02|     Bob|   6362|
|2016-01-03|     Bob|   5982|
|2016-01-04|     Bob|   7917|
|2016-01-05|     Bob|   7837|
+----------+--------+-------+
only showing top 5 rows