Thursday, March 28, 2019

PySpark - Spark SQL

Script:

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
sc = SparkContext('local[2]', 'Handson PySpark Chapter 6')

from pyspark.sql import Row, SQLContext, SparkSession
# SQLContext replaced by SparkSession since 2.0, SQLContext can be created through
# sql_context = SQLConect(sc)
spark = SparkSession(sc).builder.getOrCreate()

raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))

print(csv.take(1))

rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1],service=p[2]))

rows.take(1)

df=spark.createDataFrame(rows)

df.printSchema()

df.show(5)

df.registerTempTable('rdd')

spark.sql("""SELECT duration from rdd WHERE protocol='tcp' and duration > 2000""")

df.select("duration").filter("protocol='tcp'").filter("duration>2000").show(5)

Output (Jupyter):

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
sc = SparkContext('local[2]', 'Handson PySpark Chapter 6')

from pyspark.sql import Row, SQLContext, SparkSession
# SQLContext replaced by SparkSession since 2.0, SQLContext can be created through
# sql_context = SQLConect(sc)
spark = SparkSession(sc).builder.getOrCreate()
raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))
print(csv.take(1))
[['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']]
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1],service=p[2]))
rows.take(1)
[Row(duration=0, protocol='tcp', service='http')]
df=spark.createDataFrame(rows)
df.printSchema()
root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
df.show(5)
+--------+--------+-------+
|duration|protocol|service|
+--------+--------+-------+
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
+--------+--------+-------+
only showing top 5 rows
df.registerTempTable('rdd')
spark.sql("""SELECT duration from rdd WHERE protocol='tcp' and duration > 2000""")
df.select("duration").filter("protocol='tcp'").filter("duration>2000").show(5)
+--------+
|duration|
+--------+
|   12454|
|   10774|
|   13368|
|   10350|
|   10409|
+--------+
only showing top 5 rows
Some of the code referenced from hands-pyspark-big-data-analysis-video

No comments:

Post a Comment