Friday, March 22, 2019

Storage format evaluation using syslog data

# Read the data
syslogRDD = sc.textFile('/loudacre/syslog.txt').cache()

syslogRDD.take(2)

['Feb 11 21:30:57 cdh5 journal: Runtime journal is using 8.0M (max allowed 548.3M, trying to leave 822.5M free of 5.3G available → current limit 548.3M).',
 'Feb 11 21:30:57 cdh5 kernel: Initializing cgroup subsys cpuset']

# Parse the data
parsedRDD = syslogRDD.map(lambda line: (line.split(' '))). \
  map(lambda T: (T[0]+' '+T[1]+" "+T[2],T[3],T[4],' '.join(T[5:])))


# Assign schema
from pyspark.sql.types import *

syslogSchema = StructType(
    [StructField('tstamp', StringType()),
     StructField('hostname', StringType()),
     StructField('appname', StringType()),
     StructField('detail', StringType())])

parsedDF = parsedRDD.toDF(syslogSchema)

# Use timestamp type instead of string
# default syslog missing "year", which to_timestamp assumed starts with 1970
from pyspark.sql.functions import *
parsedDF2 = parsedDF.select(to_timestamp(concat(lit('2019 '),parsedDF.tstamp),"yyyy MMM dd HH:mm:ss").alias('tstamp'),
               "hostname","appname","detail")  

#Save the data 
parsedDF2.write.mode('overwrite').saveAsTable('syslog1')

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.parquet"
parsedDF2.write.mode('overwrite').parquet(file)

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.csv"
parsedDF2.write.mode('overwrite').option('header','true').csv(file)

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.orc"
parsedDF2.write.mode('overwrite').orc(file)

# Size

6.2M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.parquet
6.6M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.orc
 73M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.csv

No comments:

Post a Comment