Monday, December 31, 2018

Demo code to show case Solr ingestion

# Written for Solr 7 (shipped with CDH6) and Python3

import tika
import json
import urllib3
import traceback
import os

from tika import parser
url = ''
filelist = ['D:\\Temp\\Building Positive Relationships young children.pdf',
            'D:\\Temp\\Building Positive Relationships spouse n in laws.pdf']

http = urllib3.PoolManager()

for file in filelist:
        parsed = parser.from_file(file)
        #Add content to "combined" dict object        combined={}
        combined['id']=os.path.basename(file) # use file name as Doc ID        combined.update(parsed["metadata"])
        combined_json = json.loads(json.dumps(combined))


        # to clean up, execute solr command *:*        # use immutable to avoid error "This ConfigSet is immutable.", use below to create the template before create the collection        # http://node02:8983/solr/admin/configs?action=CREATE&name=myConfigSet&baseConfigSet=schemalessTemplate&configSetProp.immutable=false&wt=xml        # to search: content:"Psychologist"
        response = http.request('POST',url,body=json.dumps(combined_json),headers={'Content-Type': 'application/json'})
        print (

Sunday, December 16, 2018

Sentry and Hive permission explained

When using Sentry, the impersonation feature of HiveServer2 is disabled and each query runs in the cluster as the configured Hive principal. Thus, each HDFS location associated with a Hive table should be readable and writable by the Hive user or group.
If you are using the HDFS ACL synchronization feature, the required HDFS permissions (r-x for SELECT-wx for INSERT, and rwx for ALL) on files are enforced automatically and maintained dynamically in response to changes in privilege grants on databases and tables. In our example, the alice user would be given r-x permission to files in tables in the sales database. Note that a grant on a URI object does not result in corresponding permissions on the location in HDFS.

Saturday, October 13, 2018

Tweet Read Example with PySpark streaming analytics

import tweepy
from tweepy import Stream
from tweepy.auth import OAuthHandler
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = ''consumer_secret = ''access_token = ''access_secret = ''

class TweetsListener(StreamListener):

    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, data):
            msg = json.loads(data)
            return True        except BaseException as e:
            print("Error on data: %s" % str(e))
        return True
    def on_error(self, status):
        return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, TweetsListener(c_socket))

if __name__ == "__main__":
    s = socket.socket()  # create socket object    host = ""    port = 5555    s.bind((host, port))
    print("Listening on port: %s" % str(port))

    c, addr = s.accept()
    print("Received request from: " + str(addr))


import findspark


from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc

# Not required if running in Pyspark integrated notebooksc = SparkContext()

ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)

socket_stream = ssc.socketTextStream("",5555)

lines = socket_stream.window(20)

from collections import namedtuple
fields = ("tag","count")
Tweet = namedtuple('Tweet', fields)

# use () for multiple lines(lines.flatMap(lambda text: text.split(" "))
.filter(lambda word: word.startswith("#"))
.map(lambda word: (word.lower(), 1))
.reduceByKey(lambda a, b : a + b)
.map(lambda rec: Tweet(rec[0],rec[1]))
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))

import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# get_ipython().run_line_magic('matplotlib', 'inline')


count = 0while count < 10:
    top_10_tweets = sqlContext.sql("select tag,count from tweets")
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait = True)
    plt.figure(figsize= (10, 8))
    sns.barplot(x="count", y="tag",data=top_10_df)
    count = count+1

Wednesday, October 10, 2018

Use Jupyter Notebook with Spark2 on Apache Spark on MacOS

Reference url:

Below are changes specific for apache spark, anaconda3 on MacOS

mkdir /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/

[root@cdh-vm bin]# cat  /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/kernel.json
      "argv": [
      "display_name": "Python3.6 + Pyspark(Spark 2.3.2)",
      "language": "python",
      "env": {
        "PYSPARK_PYTHON": "python",
        "SPARK_HOME": "/Users/donghua/spark-2.3.2-bin-hadoop2.7",
        "SPARK_CONF_DIR": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/conf",
        "PYTHONPATH": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/lib/",
        "PYTHONSTARTUP": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/pyspark/",
        "PYSPARK_SUBMIT_ARGS": "--master spark://Donghuas-MacBook-Air.local:7077 --name PySparkShell pyspark-shell"

/Users/donghua/anaconda3//bin/jupyter-notebook --ip=Donghuas-MacBook-Air.local --port 9999

#export SPARK_HOME=/Users/donghua/spark-2.3.2-bin-hadoop2.7

Tuesday, October 9, 2018

Modify default to start apache-spark on MacOS

By default, Apache Spark sbin/ will try to start worker using "ssh" to slave node, regardless we were testing using our laptop. Below modification is to start it using Bash instead of SSH.

Donghuas-MacBook-Air:sbin donghua$ diff
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /"
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /" &

Revised code could be found here:

And modified version of to start multiple workers could be found here:

Sunday, September 30, 2018

Cloudera CDH "Host Clock Offset" explained

This is a host health test that checks if the host's system clock appears to be out-of-sync with its NTP server(s). 
The test uses the 'ntpdc -np' (if ntpd is running) or 'chronyc sources' (if chronyd is running) command to check that the host is synchronized to an NTP peer and that the absolute value of the host's clock offset from that peer is not too large. 
If the command fails, NTP is not synchronized to a server, or the host's NTP daemon is not running or cannot be contacted, the test will return "Bad" health. The 'ntpdc -np' or 'chronyc sources' output contains a row for each of the host's NTP servers. The row starting with a '*' (if ntpdc) or '^*' (if chronyc) contains the peer to which the host is currently synchronized. No row starting with a '*' or '^*' indicates that the host is not currently synchronized. 
Communication errors and too large an offset between the peer and the host time are examples of conditions that can lead to a host being unsynchronized. Make sure that UDP port 123 is open in any firewall that is in use. Check the system log for ntpd or chronyd messages related to configuration errors. 
If running ntpd, use 'ntpdc -c iostat' to verify that packets are sent and recieved between the different peers. More information about the conditions of each peer can be found by running the command 'ntpq -c as'. The output of this command includes the association ID that can be used in combination with 'ntpq -c "rv "' to get more information about the status of each such peer. The command 'ntpq -c pe' can also be used to return a summary of all peers and the reason why they are not in use. 
If running chronyd, use 'chronyc activity' to check how many NTP sources are online/offline. More information about the conditions of each peer can be found by running the command 'chronyc sourcestats'. To check chrony tracking, issue the command 'chronyc tracking'. 
If NTP is not in use on the host, this check should be disabled for the host using the configuration options shown below. Cloudera recommends using NTP for time synchronization of Hadoop clusters. A failure of this health test can indicate a problem with the host's NTP service or configuration. This test can be configured using the Host Clock Offset Thresholds host configuration setting.