Saturday, October 13, 2018

Tweet Read Example with PySpark streaming analytics

===================TweetsRead.py===================

import tweepy
from tweepy import Stream
from tweepy.auth import OAuthHandler
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = ''consumer_secret = ''access_token = ''access_secret = ''

class TweetsListener(StreamListener):

    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, data):
        try:
            msg = json.loads(data)
            print(msg['text'].encode('utf-8'))
            self.client_socket.send(msg['text'].encode('utf-8'))
            return True        except BaseException as e:
            print("Error on data: %s" % str(e))
        return True
    def on_error(self, status):
        print(status)
        return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, TweetsListener(c_socket))
    twitter_stream.filter(track=['china'])


if __name__ == "__main__":
    s = socket.socket()  # create socket object    host = "127.0.0.1"    port = 5555    s.bind((host, port))
    print("Listening on port: %s" % str(port))

    s.listen(5)
    c, addr = s.accept()
    print("Received request from: " + str(addr))

    sendData(c)


===================TwitterAnalytics.py===================

import findspark

findspark.init('/Users/donghua/spark-2.3.2-bin-hadoop2.7')

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc

# Not required if running in Pyspark integrated notebooksc = SparkContext()

ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)

socket_stream = ssc.socketTextStream("127.0.0.1",5555)


lines = socket_stream.window(20)

from collections import namedtuple
fields = ("tag","count")
Tweet = namedtuple('Tweet', fields)

# use () for multiple lines(lines.flatMap(lambda text: text.split(" "))
.filter(lambda word: word.startswith("#"))
.map(lambda word: (word.lower(), 1))
.reduceByKey(lambda a, b : a + b)
.map(lambda rec: Tweet(rec[0],rec[1]))
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))
.limit(10).registerTempTable("tweets")))


import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# get_ipython().run_line_magic('matplotlib', 'inline')

ssc.start()

count = 0while count < 10:
    time.sleep(10)
    top_10_tweets = sqlContext.sql("select tag,count from tweets")
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait = True)
    plt.figure(figsize= (10, 8))
    sns.barplot(x="count", y="tag",data=top_10_df)
    plt.show()
    count = count+1
ssc.stop()