Sunday, March 24, 2019

Security Master Page

TLS, SSL, HTTPS
  • Diagnosing TLS, SSL, and HTTPS
Kerberos
  • Hadoop and Kerberos: The Madness beyond the Gate
  • Configuring Ambari and Hadoop for Kerberos using AD as the KDC

Encountered error on one cluster complaining Kafka broker is 0, although 1 is up and running

ERROR admin.TopicCommand$: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
[donghua@cdh5 bin]$ ./kafka-topics --create --zookeeper cdh5:2181 --topic weblogs --replication-factor 1 --partitions 2

19/03/24 15:39:42 INFO zookeeper.ZooKeeper: Client environment:user.dir=/opt/cloudera/parcels/KAFKA-3.1.1-1.3.1.1.p0.2/bin
19/03/24 15:39:42 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=cdh5:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@498d318c
19/03/24 15:39:42 INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
19/03/24 15:39:42 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh5.dbaglobe.com/192.168.31.25:2181. Will not attempt to authenticate using SASL (unknown error)
19/03/24 15:39:42 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /192.168.31.25:43992, server: cdh5.dbaglobe.com/192.168.31.25:2181
19/03/24 15:39:42 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh5.dbaglobe.com/192.168.31.25:2181, sessionid = 0x169ae9e3aff0021, negotiated timeout = 30000
19/03/24 15:39:42 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
19/03/24 15:39:42 ERROR admin.TopicCommand$: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
19/03/24 15:39:42 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
19/03/24 15:39:42 INFO zookeeper.ZooKeeper: Session: 0x169ae9e3aff0021 closed
19/03/24 15:39:42 INFO zookeeper.ClientCnxn: EventThread shut down

Troubleshooting:

Check broker zookeeper path, which is /brokers/
[root@cdh5 log]# tail -f /var/log/kafka/kafka-broker-cdh5.dbaglobe.com.log 
2019-03-24 15:35:46,490 INFO kafka.utils.ZKCheckedEphemeral: Creating /brokers/ids/44 (is it secure? false)
2019-03-24 15:35:46,498 INFO kafka.utils.ZKCheckedEphemeral: Result of znode creation is: OK
2019-03-24 15:35:46,499 INFO kafka.utils.ZkUtils: Registered broker 44 at path /brokers/ids/44 with addresses: EndPoint(cdh5.dbaglobe.com,9092,ListenerName(PLAINTEXT),PLAINTEXT)
Check zookeeper.chroot configuration, which is /kafka". Change it “/” to fix the problem

[donghua@cdh5 bin]$ ./kafka-topics --create --zookeeper cdh5:2181 --topic weblogs --replication-factor 1 --partitions 2
19/03/24 15:49:53 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-cdh5.14.2--1, built on 03/27/2018 20:39 GMT
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:host.name=cdh5.dbaglobe.com

19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:os.version=3.10.0-957.5.1.el7.x86_64
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:user.name=donghua
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/donghua
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Client environment:user.dir=/opt/cloudera/parcels/KAFKA-3.1.1-1.3.1.1.p0.2/bin
19/03/24 15:49:53 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=cdh5:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@498d318c
19/03/24 15:49:53 INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
19/03/24 15:49:53 INFO zookeeper.ClientCnxn: Opening socket connection to server cdh5.dbaglobe.com/192.168.31.25:2181. Will not attempt to authenticate using SASL (unknown error)
19/03/24 15:49:53 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /192.168.31.25:45328, server: cdh5.dbaglobe.com/192.168.31.25:2181
19/03/24 15:49:53 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh5.dbaglobe.com/192.168.31.25:2181, sessionid = 0x169ae9e3aff003a, negotiated timeout = 30000
19/03/24 15:49:53 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
19/03/24 15:49:54 INFO admin.AdminUtils$: Topic creation {"version":1,"partitions":{"1":[44],"0":[44]}}
Created topic "weblogs".
19/03/24 15:49:54 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
19/03/24 15:49:54 INFO zookeeper.ZooKeeper: Session: 0x169ae9e3aff003a closed
19/03/24 15:49:54 INFO zookeeper.ClientCnxn: EventThread shut down

Saturday, March 23, 2019

Warning appears with testing cluster (single node) with following code in Spark Streaming DStream


lines = ssc.socketTextStream(hostname,port)
19/03/23 22:55:01 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 
19/03/23 22:55:01 WARN storage.BlockManager: Block input-0-1553352900800 replicated to only 0 peer(s) instead of 1 peers
Default replicaiton level is 2 based on the documentation
socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded \n delimited lines.
Parameters:
hostname – Hostname to connect to for receiving data
port – Port to connect to for receiving data
storageLevel – Storage level to use for storing the received objects

How to fix

  • Method 1:
#Create a DStream by reading the data from the host and port provided as input parameters.
#Default storage level is replication=2
from pyspark import StorageLevel
lines = ssc.socketTextStream(hostname,port,StorageLevel.MEMORY_AND_DISK)
  • Method 2:
from pyspark import StorageLevel
lines = ssc.socketTextStream(hostname,port,storageLevel=StorageLevel(True, True, False, False, 1))
  • Method 3:
# Set log level to ERROR to avoid distracting extra output
sc.setLogLevel("ERROR")

Reference on StorageLevel

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)[source]
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY. Since the data is always serialized on the Python side, all the constants use the serialized formats.
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)

Configure Anaconda, Jupyter Notebook and Spark 2.4 on MacOS

Configuration File

  • Spark configuration file: /Users/donghua/spark-2.4.0-bin-hadoop2.7/sbin/spark-config.sh
# symlink and absolute path should rely on SPARK_HOME to resolve
> if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
  export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
  export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:${PYTHONPATH}"
  export PYSPARK_PYTHONPATH_SET=1
fi


# added by Anaconda3 5.0.1 installer
export PATH="/Users/donghua/anaconda3/bin:$PATH"

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"
  • Anaconda Jupyter Configuration File: /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/kernel.json
    {
      "argv": [
        "python3.6",
        "-m",
        "ipykernel_launcher",
        "-f",
        "{connection_file}"
      ],
      "display_name": "Python3.6+ Pyspark(Spark 2.4.0)",
      "language": "python",
      "env": {
        "PYSPARK_PYTHON": "python",
        "SPARK_HOME": "/Users/donghua/spark-2.4.0-bin-hadoop2.7",
        "SPARK_CONF_DIR": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/conf",
        "PYTHONPATH": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/:",
        "PYTHONSTARTUP": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS": "--master spark://Donghuas-MacBook-Air.local:7077 --name PySparkShell pyspark-shell"
      }
  } 

Stop/start Spark local cluster

cd /Users/donghua/spark-2.4.0-bin-hadoop2.7;./sbin/stop-all.sh
cd /Users/donghua/spark-2.4.0-bin-hadoop2.7;./sbin/start-all.sh

Commands:

  • Juypter:
cd /Users/donghua/spark-2.4.0-bin-hadoop2.7;jupyter-notebook --ip=Donghuas-MacBook-Air.local --port 9999
  • Pyspark:
cd /Users/donghua/spark-2.4.0-bin-hadoop2.7;/Users/donghua/spark-2.4.0-bin-hadoop2.7/bin/pyspark --master spark://Donghuas-MacBook-Air.local:7077
  • Spark-submit:
/Users/donghua/spark-2.4.0-bin-hadoop2.7/bin/spark-submit --master spark://Donghuas-MacBook-Air.local:7077 NameList.py file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/people.json file:///tmp/nameList

URLs

Friday, March 22, 2019

Storage format evaluation using syslog data

# Read the data
syslogRDD = sc.textFile('/loudacre/syslog.txt').cache()

syslogRDD.take(2)

['Feb 11 21:30:57 cdh5 journal: Runtime journal is using 8.0M (max allowed 548.3M, trying to leave 822.5M free of 5.3G available → current limit 548.3M).',
 'Feb 11 21:30:57 cdh5 kernel: Initializing cgroup subsys cpuset']

# Parse the data
parsedRDD = syslogRDD.map(lambda line: (line.split(' '))). \
  map(lambda T: (T[0]+' '+T[1]+" "+T[2],T[3],T[4],' '.join(T[5:])))


# Assign schema
from pyspark.sql.types import *

syslogSchema = StructType(
    [StructField('tstamp', StringType()),
     StructField('hostname', StringType()),
     StructField('appname', StringType()),
     StructField('detail', StringType())])

parsedDF = parsedRDD.toDF(syslogSchema)

# Use timestamp type instead of string
# default syslog missing "year", which to_timestamp assumed starts with 1970
from pyspark.sql.functions import *
parsedDF2 = parsedDF.select(to_timestamp(concat(lit('2019 '),parsedDF.tstamp),"yyyy MMM dd HH:mm:ss").alias('tstamp'),
               "hostname","appname","detail")  

#Save the data 
parsedDF2.write.mode('overwrite').saveAsTable('syslog1')

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.parquet"
parsedDF2.write.mode('overwrite').parquet(file)

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.csv"
parsedDF2.write.mode('overwrite').option('header','true').csv(file)

file="file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.orc"
parsedDF2.write.mode('overwrite').orc(file)

# Size

6.2M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.parquet
6.6M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.orc
 73M  /Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/syslog.csv

Monday, March 18, 2019

RDD Lambda function on array raise syntax error in Spark 2.4

Spark 1.6

[donghua@cdh5 ~]$ pyspark
Python 2.7.5 (default, Oct 30 2018, 23:45:53) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkContext available as sc, HiveContext available as sqlContext.
>>> rdd1 = sc.textFile('file:///tmp/postal.txt')
>>> rdd1.keyBy(lambda line: line.split('\t')[0]).map(lambda (k,v): (k, (v.split('\t')[1],v.split('\t')[2]))).take(2)
[(u'00210', (u'43.00589', u'-71.01320')), (u'01014', (u'42.17073', u'-72.60484'))]
>>> 
[donghua@cdh5 ~]$ 

Spark 2.3
[donghua@cdh5 ~]$ pyspark2
Python 2.7.5 (default, Oct 30 2018, 23:45:53) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/03/18 09:55:51 WARN lineage.LineageWriter: Lineage directory /var/log/spark2/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
19/03/18 09:55:52 WARN lineage.LineageWriter: Lineage directory /var/log/spark2/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera4
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.
>>> rdd1 = sc.textFile('file:///tmp/postal.txt')
>>> rdd1.keyBy(lambda line: line.split('\t')[0]).map(lambda (k,v): (k, (v.split('\t')[1],v.split('\t')[2]))).take(2)
[(u'00210', (u'43.00589', u'-71.01320')), (u'01014', (u'42.17073', u'-72.60484'))]
>>> 

Spark 2.4

onghuas-MacBook-Air:data donghua$ cd /Users/donghua/spark-2.4.0-bin-hadoop2.7;/Users/donghua/spark-2.4.0-bin-hadoop2.7/bin/pyspark --master spark://Donghuas-MacBook-Air.local:7077
Python 3.6.8 |Anaconda, Inc.| (default, Dec 29 2018, 19:04:46) 
[GCC 4.2.1 Compatible Clang 4.0.1 (tags/RELEASE_401/final)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2019-03-18 09:57:59 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.6.8 (default, Dec 29 2018 19:04:46)
SparkSession available as 'spark'.
>>> rdd1 = sc.textFile('file:///Users/donghua/spark-2.4.0-bin-hadoop2.7/data/data/postal.txt')
>>> rdd1.keyBy(lambda line: line.split('\t')[0]).map(lambda (k,v): (k, (v.split('\t')[1],v.split('\t')[2]))).take(2)
  File "", line 1
    rdd1.keyBy(lambda line: line.split('\t')[0]).map(lambda (k,v): (k, (v.split('\t')[1],v.split('\t')[2]))).take(2)
                                                            ^
SyntaxError: invalid syntax
>>> rdd1.keyBy(lambda line: line.split('\t')[0]).map(lambda v: (v[0], (v[1].split('\t')[1],v[1].split('\t')[2]))).take(2)
2019-03-18 09:59:23 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[('00210', ('43.00589', '-71.01320')), ('01014', ('42.17073', '-72.60484'))]    
>>> 

Reference: 
PEP 3113 -- Removal of Tuple Parameter Unpacking
https://www.python.org/dev/peps/pep-3113/


Friday, March 15, 2019

Workable jupyter notebook and spark2 configuration in MacOS

File: /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/kernel.json
    {
      "argv": [
        "python3.6",
        "-m",
        "ipykernel_launcher",
        "-f",
        "{connection_file}"
      ],
      "display_name": "Python3.6 + Pyspark(Spark 2.4.0)",
      "language": "python",
      "env": {
        "PYSPARK_PYTHON": "python",
        "SPARK_HOME": "/Users/donghua/spark-2.4.0-bin-hadoop2.7",
        "SPARK_CONF_DIR": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/conf",
        "PYTHONPATH": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/:",
        "PYTHONSTARTUP": "/Users/donghua/spark-2.4.0-bin-hadoop2.7/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS": "--master spark://Donghuas-MacBook-Air.local:7077 --name PySparkShell pyspark-shell"
      }
  }


File: /Users/donghua/spark-2.4.0-bin-hadoop2.7/sbin/spark-config.sh

export PATH="/Users/donghua/anaconda3/bin:$PATH"

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"


Enable Spark cluster to connect to HDFS/Hive in non-secure CDH cluster

Copy following 4 files into $SPARK_HOME/conf folder.
- core-site.xml
- hadoop-env,sh
- hive-site.xml
- hive-env.sh

-rwxr-xr-x@ 1 donghua  staff  3860 Mar 16 11:54 /Users/donghua/spark-2.4.0-bin-hadoop2.7/conf/core-site.xml
-rwxr-xr-x@ 1 donghua  staff   557 Mar 16 11:54 /Users/donghua/spark-2.4.0-bin-hadoop2.7/conf/hadoop-env.sh
-rwxr-xr-x@ 1 donghua  staff  1132 Mar 16 11:54 /Users/donghua/spark-2.4.0-bin-hadoop2.7/conf/hive-env.sh
-rwxr-xr-x@ 1 donghua  staff  5399 Mar 16 11:54 /Users/donghua/spark-2.4.0-bin-hadoop2.7/conf/hive-site.xml