Saturday, March 30, 2019

Script to analyze personal trip data exported from Grab

Script to analyze personal trip data exported from Grab:


import pandas as pd
import numpy as np

df = pd.read_csv('/Users/donghua/Documents/Grab-history-20180-201903.csv',skiprows=3,skipfooter=3,engine='python')
# df.head(1)
df['Date']=pd.to_datetime(df['Date'], format='%a %d %b %Y, %I:%M:%S %p')
df['Month']=df['Date'].dt.strftime('%Y-%m')
# df.head(1)

df[['Month','Fare']].groupby('Month').sum().sort_index()
df[['Month','Fare']].groupby('Month').mean().sort_index()

import matplotlib.pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
df[['Month','Fare']].groupby('Month').sum().plot.bar();
df[['Month','Fare']].groupby('Month').mean().plot.bar();
Jupyter Notebook Output:
import pandas as pd
import numpy as np
df = pd.read_csv('/Users/donghua/Documents/Grab-history-20180-201903.csv',skiprows=3,skipfooter=3,engine='python')
# df.head(1)
df['Date']=pd.to_datetime(df['Date'], format='%a %d %b %Y, %I:%M:%S %p')
df['Month']=df['Date'].dt.strftime('%Y-%m')
# df.head(1)
df[['Month','Fare']].groupby('Month').sum().sort_index()

df[['Month','Fare']].groupby('Month').mean().sort_index()


import matplotlib.pyplot as plt
%matplotlib inline
df[['Month','Fare']].groupby('Month').sum().plot.bar();
df[['Month','Fare']].groupby('Month').mean().plot.bar();

Python date time string format reference code

DirectiveMeaningExampleNotes
%aWeekday as locale’s abbreviated name.
Sun, Mon, …, Sat (en_US);
So, Mo, …, Sa (de_DE)
(1)
%AWeekday as locale’s full name.
Sunday, Monday, …, Saturday (en_US);
Sonntag, Montag, …, Samstag (de_DE)
(1)
%wWeekday as a decimal number, where 0 is Sunday and 6 is Saturday.0, 1, …, 6 
%dDay of the month as a zero-padded decimal number.01, 02, …, 31 
%bMonth as locale’s abbreviated name.
Jan, Feb, …, Dec (en_US);
Jan, Feb, …, Dez (de_DE)
(1)
%BMonth as locale’s full name.
January, February, …, December (en_US);
Januar, Februar, …, Dezember (de_DE)
(1)
%mMonth as a zero-padded decimal number.01, 02, …, 12 
%yYear without century as a zero-padded decimal number.00, 01, …, 99 
%YYear with century as a decimal number.0001, 0002, …, 2013, 2014, …, 9998, 9999(2)
%HHour (24-hour clock) as a zero-padded decimal number.00, 01, …, 23 
%IHour (12-hour clock) as a zero-padded decimal number.01, 02, …, 12 
%pLocale’s equivalent of either AM or PM.
AM, PM (en_US);
am, pm (de_DE)
(1), (3)
%MMinute as a zero-padded decimal number.00, 01, …, 59 
%SSecond as a zero-padded decimal number.00, 01, …, 59(4)
%fMicrosecond as a decimal number, zero-padded on the left.000000, 000001, …, 999999(5)
%zUTC offset in the form ±HHMM[SS[.ffffff]] (empty string if the object is naive).(empty), +0000, -0400, +1030, +063415, -030712.345216(6)
%ZTime zone name (empty string if the object is naive).(empty), UTC, EST, CST 
%jDay of the year as a zero-padded decimal number.001, 002, …, 366 
%UWeek number of the year (Sunday as the first day of the week) as a zero padded decimal number. All days in a new year preceding the first Sunday are considered to be in week 0.00, 01, …, 53(7)
%WWeek number of the year (Monday as the first day of the week) as a decimal number. All days in a new year preceding the first Monday are considered to be in week 0.00, 01, …, 53(7)
%cLocale’s appropriate date and time representation.
Tue Aug 16 21:30:00 1988 (en_US);
Di 16 Aug 21:30:00 1988 (de_DE)
(1)
%xLocale’s appropriate date representation.
08/16/88 (None);
08/16/1988 (en_US);
16.08.1988 (de_DE)
(1)
%XLocale’s appropriate time representation.
21:30:00 (en_US);
21:30:00 (de_DE)
(1)
%%A literal '%' character.%

Several additional directives not required by the C89 standard are included for convenience. These parameters all correspond to ISO 8601 date values. These may not be available on all platforms when used with the strftime() method. The ISO 8601 year and ISO 8601 week directives are not interchangeable with the year and week number directives above. Calling strptime() with incomplete or ambiguous ISO 8601 directives will raise a ValueError.
DirectiveMeaningExampleNotes
%GISO 8601 year with century representing the year that contains the greater part of the ISO week (%V).0001, 0002, …, 2013, 2014, …, 9998, 9999(8)
%uISO 8601 weekday as a decimal number where 1 is Monday.1, 2, …, 7 
%VISO 8601 week as a decimal number with Monday as the first day of the week. Week 01 is the week containing Jan 4.01, 02, …, 53(8)
New in version 3.6: %G, %u and %V were added.
Notes:
  1. Because the format depends on the current locale, care should be taken when making assumptions about the output value. Field orderings will vary (for example, “month/day/year” versus “day/month/year”), and the output may contain Unicode characters encoded using the locale’s default encoding (for example, if the current locale is ja_JP, the default encoding could be any one of eucJP, SJIS, or utf-8; use locale.getlocale() to determine the current locale’s encoding).
  2. The strptime() method can parse years in the full [1, 9999] range, but years < 1000 must be zero-filled to 4-digit width.
    Changed in version 3.2: In previous versions, strftime() method was restricted to years >= 1900.
    Changed in version 3.3: In version 3.2, strftime() method was restricted to years >= 1000.
  3. When used with the strptime() method, the %p directive only affects the output hour field if the %I directive is used to parse the hour.
  4. Unlike the time module, the datetime module does not support leap seconds.
  5. When used with the strptime() method, the %f directive accepts from one to six digits and zero pads on the right.  %f is an extension to the set of format characters in the C standard (but implemented separately in datetime objects, and therefore always available).
  6. For a naive object, the %z and %Z format codes are replaced by empty strings.
    For an aware object:
    %z
    utcoffset() is transformed into a string of the form ±HHMM[SS[.ffffff]], where HH is a 2-digit string giving the number of UTC offset hours, MM is a 2-digit string giving the number of UTC offset minutes, SS is a 2-digit string giving the number of UTC offset seconds and ffffff is a 6-digit string giving the number of UTC offset microseconds. The ffffff part is omitted when the offset is a whole number of seconds and both the ffffff and the SS part is omitted when the offset is a whole number of minutes. For example, if utcoffset() returns timedelta(hours=-3, minutes=-30), %z is replaced with the string '-0330'.
    Changed in version 3.7: The UTC offset is not restricted to a whole number of minutes.
    Changed in version 3.7: When the %z directive is provided to the  strptime() method, the UTC offsets can have a colon as a separator between hours, minutes and seconds. For example, '+01:00:00' will be parsed as an offset of one hour. In addition, providing 'Z' is identical to '+00:00'.
    %Z
    If tzname() returns None, %Z is replaced by an empty string. Otherwise %Z is replaced by the returned value, which must be a string.
    Changed in version 3.2: When the %z directive is provided to the strptime() method, an aware datetime object will be produced. The tzinfo of the result will be set to a timezone instance.
  7. When used with the strptime() method, %U and %W are only used in calculations when the day of the week and the calendar year (%Y) are specified.
  8. Similar to %U and %W, %V is only used in calculations when the day of the week and the ISO year (%G) are specified in a strptime() format string. Also note that %G and %Y are not interchangeable.
https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior

Thursday, March 28, 2019

PySpark - Spark SQL

Script:

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
sc = SparkContext('local[2]', 'Handson PySpark Chapter 6')

from pyspark.sql import Row, SQLContext, SparkSession
# SQLContext replaced by SparkSession since 2.0, SQLContext can be created through
# sql_context = SQLConect(sc)
spark = SparkSession(sc).builder.getOrCreate()

raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))

print(csv.take(1))

rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1],service=p[2]))

rows.take(1)

df=spark.createDataFrame(rows)

df.printSchema()

df.show(5)

df.registerTempTable('rdd')

spark.sql("""SELECT duration from rdd WHERE protocol='tcp' and duration > 2000""")

df.select("duration").filter("protocol='tcp'").filter("duration>2000").show(5)

Output (Jupyter):

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
sc = SparkContext('local[2]', 'Handson PySpark Chapter 6')

from pyspark.sql import Row, SQLContext, SparkSession
# SQLContext replaced by SparkSession since 2.0, SQLContext can be created through
# sql_context = SQLConect(sc)
spark = SparkSession(sc).builder.getOrCreate()
raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))
print(csv.take(1))
[['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']]
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1],service=p[2]))
rows.take(1)
[Row(duration=0, protocol='tcp', service='http')]
df=spark.createDataFrame(rows)
df.printSchema()
root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
df.show(5)
+--------+--------+-------+
|duration|protocol|service|
+--------+--------+-------+
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
|       0|     tcp|   http|
+--------+--------+-------+
only showing top 5 rows
df.registerTempTable('rdd')
spark.sql("""SELECT duration from rdd WHERE protocol='tcp' and duration > 2000""")
df.select("duration").filter("protocol='tcp'").filter("duration>2000").show(5)
+--------+
|duration|
+--------+
|   12454|
|   10774|
|   13368|
|   10350|
|   10409|
+--------+
only showing top 5 rows
Some of the code referenced from hands-pyspark-big-data-analysis-video

PySpark - MLLib

Script:

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark=SparkSession(SparkContext()).builder.master('local[2]').appName('Handson PySpark Chapter 5').getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('debug')
sc.getConf().getAll()


import urllib.request
url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
localfile = '/tmp/kddcup.data_10_percent.gz'
f = urllib.request.urlretrieve(url,localfile)

raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))
duration = raw_data.map(lambda x: [int(x[0])])

from pyspark.mllib.stat import Statistics
summary = Statistics.colStats(duration)
summary.mean()[0]
summary.count()

metrics = csv.map(lambda x: [x[0],x[4],x[5]])
metrics.take(2)

Statistics.corr(metrics, method="spearman")

Statistics.corr(metrics, method="pearson")

from pyspark.mllib.linalg import Vectors

visitors_freq = Vectors.dense(0.13, 0.61, 0.8, 0.5, 0.3)
print(Statistics.chiSqTest(visitors_freq))

visitors_freq = Vectors.dense(0.13, 0.61, 0.8, 0.5, 8)
print(Statistics.chiSqTest(visitors_freq))

print(Statistics.chiSqTest(duration.collect()))

spark.stop()

Output (Jupyter):

import findspark
findspark.init('/Users/donghua/spark-2.4.0-bin-hadoop2.7')

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark=SparkSession(SparkContext()).builder.master('local[2]').appName('Handson PySpark Chapter 5').getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('debug')
sc.getConf().getAll()
[('spark.sql.warehouse.dir', '/user/hive/warehouse'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1553755489097'),
 ('spark.driver.port', '51208'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Handson PySpark Chapter 5'),
 ('spark.driver.host', '192.168.31.177'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]')]
import urllib.request
url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
localfile = '/tmp/kddcup.data_10_percent.gz'
f = urllib.request.urlretrieve(url,localfile)

raw_data = sc.textFile('file:///tmp/kddcup.data_10_percent.gz')
csv = raw_data.map(lambda x: x.split(','))
duration = raw_data.map(lambda x: [int(x[0])])
from pyspark.mllib.stat import Statistics
summary = Statistics.colStats(duration)
summary.mean()[0]
0.06611054995637812
summary.count()
494021
metrics = csv.map(lambda x: [x[0],x[4],x[5]])
metrics.take(2)
[['0', '181', '5450'], ['0', '239', '486']]
Statistics.corr(metrics, method="spearman")
array([[ 1.        ,  0.01419628,  0.29918926],
       [ 0.01419628,  1.        , -0.16793059],
       [ 0.29918926, -0.16793059,  1.        ]])
Statistics.corr(metrics, method="pearson")
array([[ 1.00000000e+00,  4.25823027e-03,  5.43953448e-03],
       [ 4.25823027e-03,  1.00000000e+00, -1.59677215e-06],
       [ 5.43953448e-03, -1.59677215e-06,  1.00000000e+00]])
from pyspark.mllib.linalg import Vectors
visitors_freq = Vectors.dense(0.13, 0.61, 0.8, 0.5, 0.3)
print(Statistics.chiSqTest(visitors_freq))
Chi squared test summary:
method: pearson
degrees of freedom = 4 
statistic = 0.5852136752136753 
pValue = 0.9646925263439344 
No presumption against null hypothesis: observed follows the same distribution as expected..
visitors_freq = Vectors.dense(0.13, 0.61, 0.8, 0.5, 8)
print(Statistics.chiSqTest(visitors_freq))
Chi squared test summary:
method: pearson
degrees of freedom = 4 
statistic = 22.469462151394424 
pValue = 1.6158934330234853E-4 
Very strong presumption against null hypothesis: observed follows the same distribution as expected..
print(Statistics.chiSqTest(duration.collect()))
Chi squared test summary:
method: pearson
degrees of freedom = 494020 
statistic = 2041502.1434188513 
pValue = 0.0 
Very strong presumption against null hypothesis: observed follows the same distribution as expected..
spark.stop()
Some of the code referenced from hands-pyspark-big-data-analysis-video