Saturday, October 13, 2018

Tweet Read Example with PySpark streaming analytics

import tweepy
from tweepy import Stream
from tweepy.auth import OAuthHandler
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = ''consumer_secret = ''access_token = ''access_secret = ''

class TweetsListener(StreamListener):

    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, data):
            msg = json.loads(data)
            return True        except BaseException as e:
            print("Error on data: %s" % str(e))
        return True
    def on_error(self, status):
        return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, TweetsListener(c_socket))

if __name__ == "__main__":
    s = socket.socket()  # create socket object    host = ""    port = 5555    s.bind((host, port))
    print("Listening on port: %s" % str(port))

    c, addr = s.accept()
    print("Received request from: " + str(addr))


import findspark


from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc

# Not required if running in Pyspark integrated notebooksc = SparkContext()

ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)

socket_stream = ssc.socketTextStream("",5555)

lines = socket_stream.window(20)

from collections import namedtuple
fields = ("tag","count")
Tweet = namedtuple('Tweet', fields)

# use () for multiple lines(lines.flatMap(lambda text: text.split(" "))
.filter(lambda word: word.startswith("#"))
.map(lambda word: (word.lower(), 1))
.reduceByKey(lambda a, b : a + b)
.map(lambda rec: Tweet(rec[0],rec[1]))
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))

import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# get_ipython().run_line_magic('matplotlib', 'inline')


count = 0while count < 10:
    top_10_tweets = sqlContext.sql("select tag,count from tweets")
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait = True)
    plt.figure(figsize= (10, 8))
    sns.barplot(x="count", y="tag",data=top_10_df)
    count = count+1

Wednesday, October 10, 2018

Use Jupyter Notebook with Spark2 on Apache Spark on MacOS

Reference url:

Below are changes specific for apache spark, anaconda3 on MacOS

mkdir /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/

[root@cdh-vm bin]# cat  /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/kernel.json
      "argv": [
      "display_name": "Python3.6 + Pyspark(Spark 2.3.2)",
      "language": "python",
      "env": {
        "PYSPARK_PYTHON": "python",
        "SPARK_HOME": "/Users/donghua/spark-2.3.2-bin-hadoop2.7",
        "SPARK_CONF_DIR": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/conf",
        "PYTHONPATH": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/lib/",
        "PYTHONSTARTUP": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/pyspark/",
        "PYSPARK_SUBMIT_ARGS": "--master spark://Donghuas-MacBook-Air.local:7077 --name PySparkShell pyspark-shell"

/Users/donghua/anaconda3//bin/jupyter-notebook --ip=Donghuas-MacBook-Air.local --port 9999

#export SPARK_HOME=/Users/donghua/spark-2.3.2-bin-hadoop2.7

Tuesday, October 9, 2018

Modify default to start apache-spark on MacOS

By default, Apache Spark sbin/ will try to start worker using "ssh" to slave node, regardless we were testing using our laptop. Below modification is to start it using Bash instead of SSH.

Donghuas-MacBook-Air:sbin donghua$ diff
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /"
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /" &

Revised code could be found here:

And modified version of to start multiple workers could be found here:

Sunday, September 30, 2018

Cloudera CDH "Host Clock Offset" explained

This is a host health test that checks if the host's system clock appears to be out-of-sync with its NTP server(s). 
The test uses the 'ntpdc -np' (if ntpd is running) or 'chronyc sources' (if chronyd is running) command to check that the host is synchronized to an NTP peer and that the absolute value of the host's clock offset from that peer is not too large. 
If the command fails, NTP is not synchronized to a server, or the host's NTP daemon is not running or cannot be contacted, the test will return "Bad" health. The 'ntpdc -np' or 'chronyc sources' output contains a row for each of the host's NTP servers. The row starting with a '*' (if ntpdc) or '^*' (if chronyc) contains the peer to which the host is currently synchronized. No row starting with a '*' or '^*' indicates that the host is not currently synchronized. 
Communication errors and too large an offset between the peer and the host time are examples of conditions that can lead to a host being unsynchronized. Make sure that UDP port 123 is open in any firewall that is in use. Check the system log for ntpd or chronyd messages related to configuration errors. 
If running ntpd, use 'ntpdc -c iostat' to verify that packets are sent and recieved between the different peers. More information about the conditions of each peer can be found by running the command 'ntpq -c as'. The output of this command includes the association ID that can be used in combination with 'ntpq -c "rv "' to get more information about the status of each such peer. The command 'ntpq -c pe' can also be used to return a summary of all peers and the reason why they are not in use. 
If running chronyd, use 'chronyc activity' to check how many NTP sources are online/offline. More information about the conditions of each peer can be found by running the command 'chronyc sourcestats'. To check chrony tracking, issue the command 'chronyc tracking'. 
If NTP is not in use on the host, this check should be disabled for the host using the configuration options shown below. Cloudera recommends using NTP for time synchronization of Hadoop clusters. A failure of this health test can indicate a problem with the host's NTP service or configuration. This test can be configured using the Host Clock Offset Thresholds host configuration setting.

Friday, August 31, 2018

Clean up hanging/stuck ambari background operations manually

login ambari database and find out tasking ID required to clean up:

select task_id,role,role_command from host_role_command where status='IN_PROGRESS';
select task_id,role,role_command from host_role_command where status='QUEUED';
select task_id,role,role_command from host_role_command where status='PENDING';

update identified task_id to aborted status:

update host_role_command set status='ABORTED' where task_id in (264,266);

Wednesday, August 22, 2018

How to enable support for TLS 1.2 in Windows 7 and Windows 8.1

Starting July 1, 2018, support is being removed for TLS 1.0 and 1.1 from Azure VPN Gateway. VPN Gateway will support only TLS 1.2. To maintain support, see the updates to enable support for TLS1.2.

How to enable support for TLS 1.2 in Windows 7 and Windows 8.1:

Open a command prompt with elevated priveleges by right-clicking on Command Prompt and selecting Run as administrator.

Run the following commands in the command prompt:

reg add HKLM\SYSTEM\CurrentControlSet\Services\RasMan\PPP\EAP\13 /v TlsVersion /t REG_DWORD /d 0xfc0
reg add "HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Internet Settings\WinHttp" /v DefaultSecureProtocols /t REG_DWORD /d 0xaa0
if %PROCESSOR_ARCHITECTURE% EQU AMD64 reg add "HKLM\SOFTWARE\Wow6432Node\Microsoft\Windows\CurrentVersion\Internet Settings\WinHttp" /v DefaultSecureProtocols /t REG_DWORD /d 0xaa0

Install the following updates:

Reboot the computer.