Sunday, January 14, 2018

Sample configuration using nginx LB to access Cloudera Manager

[root@localhost ~]# cat /etc/nginx/conf.d/cloudera.conf
server {
    listen 7180;
    location / {
    proxy_pass http://clouderacm;
  # May not need or want to set Host. Should default to the above hostname.
  proxy_set_header          Host            $http_host;
  proxy_set_header          X-Forwarded-For  $remote_addr;
    }
}

upstream clouderacm {
    hash $remote_addr consistent;

    server cdh-vm.dbaglobe.com:7180;
}
}

Saturday, January 6, 2018

R: Work with SQL Server using RODBC

> library(RODBC)
> # Connect SQL Server using integration Security
> connStr <- paste("Server=WIN2016\\SQL2017",
+                 "Driver=SQL Server",
+                  "Database=AdventureWorks2017",
+                  sep=";")
> conn <-odbcDriverConnect(connStr)


> tab <- sqlTables(conn)


> head(tab)
           TABLE_CAT    TABLE_SCHEM                TABLE_NAME TABLE_TYPE REMARKS
1 AdventureWorks2017            dbo            AWBuildVersion      TABLE    <NA>
2 AdventureWorks2017            dbo               DatabaseLog      TABLE    <NA>
3 AdventureWorks2017            dbo                  ErrorLog      TABLE    <NA>
4 AdventureWorks2017 HumanResources                Department      TABLE    <NA>
5 AdventureWorks2017 HumanResources                  Employee      TABLE    <NA>
6 AdventureWorks2017 HumanResources EmployeeDepartmentHistory      TABLE    <NA>

> emp <- sqlFetch(conn, "HumanResources.Employee")


> head(emp)
  BusinessEntityID NationalIDNumber                   LoginID OrganizationNode OrganizationLevel
1                1        295847284     adventure-works\\ken0                                 NA
2                2        245797967   adventure-works\\terri0               58                 1
3                3        509647174 adventure-works\\roberto0           5a, c0                 2
4                4        112457891     adventure-works\\rob0           5a, d6                 3
5                5        695256908    adventure-works\\gail0           5a, da                 3
6                6        998320692  adventure-works\\jossef0           5a, de                 3
                       JobTitle  BirthDate MaritalStatus Gender   HireDate SalariedFlag VacationHours SickLeaveHours
1       Chief Executive Officer 1969-01-29             S      M 2009-01-14            1            99             69
2 Vice President of Engineering 1971-08-01             S      F 2008-01-31            1             1             20
3           Engineering Manager 1974-11-12             M      M 2007-11-11            1             2             21
4          Senior Tool Designer 1974-12-23             S      M 2007-12-05            0            48             80
5               Design Engineer 1952-09-27             M      F 2008-01-06            1             5             22
6               Design Engineer 1959-03-11             M      M 2008-01-24            1             6             23
  CurrentFlag                              rowguid ModifiedDate
1           1 F01251E5-96A3-448D-981E-0F99D789110D   2014-06-30
2           1 45E8F437-670D-4409-93CB-F9424A40D6EE   2014-06-30
3           1 9BBBFB2C-EFBB-4217-9AB7-F97689328841   2014-06-30
4           1 59747955-87B8-443F-8ED4-F8AD3AFDF3A9   2014-06-30
5           1 EC84AE09-F9B8-4A15-B4A9-6CCBAB919B08   2014-06-30
6           1 E39056F1-9CD5-478D-8945-14ACA7FBDCDD   2014-06-30

> query <- "select top 10 LoginID, JobTitle from HumanResources.Employee where HireDate > '2010-01-01'"
> sqlQuery(conn,query)
                      LoginID                     JobTitle
1   adventure-works\\ovidiu0         Senior Tool Designer
2   adventure-works\\janice0                Tool Designer
3  adventure-works\\michael8       Senior Design Engineer
4   adventure-works\\sharon0              Design Engineer
5     adventure-works\\john5         Marketing Specialist
6     adventure-works\\mary2          Marketing Assistant
7   adventure-works\\wanida0          Marketing Assistant
8      adventure-works\\kim1 Production Technician - WC60
9       adventure-works\\ed0 Production Technician - WC60
10  adventure-works\\maciej0 Production Technician - WC60

> query <- "select top 10 * from HumanResources.Employee where HireDate > '2010-01-01'"
> df <- sqlQuery(conn,query)[c("LoginID", "JobTitle")]
> df
                     LoginID                     JobTitle
1   adventure-works\\ovidiu0         Senior Tool Designer
2   adventure-works\\janice0                Tool Designer
3  adventure-works\\michael8       Senior Design Engineer
4   adventure-works\\sharon0              Design Engineer
5     adventure-works\\john5         Marketing Specialist
6     adventure-works\\mary2          Marketing Assistant
7   adventure-works\\wanida0          Marketing Assistant
8      adventure-works\\kim1 Production Technician - WC60
9       adventure-works\\ed0 Production Technician - WC60
10  adventure-works\\maciej0 Production Technician - WC60

> dim(df)
[1] 10  2
> > sapply(df,class)
  LoginID JobTitle
"factor" "factor"


> sqlColumns(conn, "HumanResources.Employee")[c("COLUMN_NAME","TYPE_NAME")]
         COLUMN_NAME        TYPE_NAME
1   BusinessEntityID              int
2   NationalIDNumber         nvarchar
3            LoginID         nvarchar
4   OrganizationNode      hierarchyid
5  OrganizationLevel         smallint
6           JobTitle         nvarchar
7          BirthDate             date
8      MaritalStatus            nchar
9             Gender            nchar
10          HireDate             date
11      SalariedFlag             Flag
12     VacationHours         smallint
13    SickLeaveHours         smallint
14       CurrentFlag             Flag
15           rowguid uniqueidentifier
16      ModifiedDate         datetime


> df <- sqlQuery(conn, "select ProductID, avg(UnitPrice),stdev(UnitPrice) from [Sales].[SalesOrderDetail] group by ProductID")
> colnames(df) <- c("ProductID", "Avg(UnitPrice)", "STDEV(UnitPrice)")
> colnames(df)
[1] "ProductID"       "Avg(UnitPrice)"  "STDEV(UnitPrice)"

> names(df)
[1] "ProductID"       "Avg(UnitPrice)"  "STDEV(UnitPrice)"

> head(df)
  ProductID Avg(UnitPrice) STDEV(UnitPrice)
1       925       149.8519    3.315829e-01
2       902       200.0520    0.000000e+00
3       710         5.7000    2.299513e-07
4       879       159.0000    0.000000e+00
5       733       356.8980    1.677983e-05
6       856        53.9073    8.234393e-01

> head(df[1:2],3)
  ProductID Avg(UnitPrice)
1       925       149.8519
2       902       200.0520
3       710         5.7000

> dim(df);ncol(df);nrow(df)
[1] 266   3
[1] 3
[1] 266

# str –> structure, not string

> str(df)
'data.frame':    266 obs. of  3 variables:
  $ ProductID      : int  925 902 710 879 733 856 756 779 802 971 ...
  $ Avg(UnitPrice) : num  149.9 200.1 5.7 159 356.9 ...
  $ STDEV(UnitPrice): num  3.32e-01 0.00 2.30e-07 0.00 1.68e-05 ...


> df[df$`Avg(UnitPrice)`>3000,c("ProductID","Avg(UnitPrice)")]
    ProductID Avg(UnitPrice)
23        750       3270.419
47        753       3035.880
130       751       3326.304
158       752       3290.494
188       749       3170.195


> df[df$`Avg(UnitPrice)`>3000,]
    ProductID Avg(UnitPrice) STDEV(UnitPrice)
23        750       3270.419        588.9196
47        753       3035.880        695.0954
130       751       3326.304        545.7862
158       752       3290.494        574.4163
188       749       3170.195        646.8741

> subset(df,ProductID>750 & `Avg(UnitPrice)`>3000,select=-`STDEV(UnitPrice)`)
    ProductID Avg(UnitPrice)
47        753       3035.880
130       751       3326.304
158       752       3290.494

> df2 <- sqlQuery(conn,"select ProductID,UnitPrice from Sales.SalesOrderDetail")
> summary(df2)
   ProductID       UnitPrice      
  Min.   :707.0   Min.   :   1.328 
  1st Qu.:768.0   1st Qu.:  21.490 
  Median :863.0   Median :  49.990 
  Mean   :841.7   Mean   : 465.093 
  3rd Qu.:921.0   3rd Qu.: 602.346 
  Max.   :999.0   Max.   :3578.270 

> summary(df2$UnitPrice)
    Min.  1st Qu.   Median     Mean  3rd Qu.     Max.
   1.328   21.490   49.990  465.100  602.300 3578.000
  
> df_emp <- sqlQuery(conn,"select JobTitle,BirthDate from HumanResources.Employee")  

> sqlSave(conn,df_emp,tablename="r_temp1",rownames=FALSE,fast=TRUE)

> sqlDrop(conn,"r_temp1")

> version
               _                           
platform       x86_64-w64-mingw32          
arch           x86_64                      
os             mingw32                     
system         x86_64, mingw32             
status                                     
major          3                           
minor          3.3                         
year           2017                        
month          03                          
day            06                          
svn rev        72310                       
language       R                           
version.string R version 3.3.3 (2017-03-06)
nickname       Another Canoe               

> library(help="RODBC")

        Information on package ‘RODBC’

Description:

Package:              RODBC
Version:              1.3-15
Revision:             $Rev: 3476 $
Date:                 2017-04-13
Authors@R:            c(person("Brian", "Ripley", role = c("aut", "cre"), email = "ripley@stats.ox.ac.uk"),
                      person("Michael", "Lapsley", role = "aut", comment = "1999 to Oct 2002"))
Title:                ODBC Database Access
Description:          An ODBC database interface.
SystemRequirements:   An ODBC3 driver manager and drivers.
Depends:              R (>= 3.0.0)
Imports:              stats
LazyLoad:             yes
Biarch:               yes
License:              GPL-2 | GPL-3
NeedsCompilation:     yes
Packaged:             2017-04-13 07:00:50 UTC; ripley
Author:               Brian Ripley [aut, cre], Michael Lapsley [aut] (1999 to Oct 2002)
Maintainer:           Brian Ripley <ripley@stats.ox.ac.uk>
Repository:           CRAN
Date/Publication:     2017-04-13 07:04:28 UTC
Built:                R 3.3.2; x86_64-w64-mingw32; 2017-04-28 16:33:43 UTC; windows

Index:

RODBC                   ODBC Database Connectivity
odbcClose               ODBC Close Connections
odbcConnect             ODBC Open Connections
odbcDataSources         List ODBC Data Sources
odbcGetInfo             Request Information on an ODBC Connection
odbcQuery               Low-level ODBC functions
odbcSetAutoCommit       ODBC Set Auto-Commit Mode
setSqlTypeInfo          Specify or Query a Mapping of R Types to DBMS
                        Types
sqlColumns              Query Column Structure in ODBC Tables
sqlCopy                 ODBC Copy
sqlDrop                 Deletion Operations on Tables in ODBC databases
sqlFetch                Reading Tables from ODBC Databases
sqlQuery                Query an ODBC Database
sqlSave                 Write a Data Frame to a Table in an ODBC
                        Database
sqlTables               List Tables on an ODBC Connection
sqlTypeInfo             Request Information about Data Types in an ODBC
                         Database

Further information is available in the following vignettes in directory ‘C:/Program Files/Microsoft SQL
Server/140/R_SERVER/library/RODBC/doc’:

RODBC: ODBC Connectivity (source, pdf)

Wednesday, January 3, 2018

Using Open Source R-Studio Server connecting to Kerberos-enabled Hadoop

Step 1: Add line "SPARK_HOME=${SPARK_HOME-'/opt/cloudera/parcels/CDH/lib/spark/'}" to end of file "/usr/lib64/R/etc/Renviron"

Step 2: Connect to Spark using sparklyr inside R-Studio Server

> install.packages("sparklyr")
> library(sparklyr)
> readRenviron("/usr/lib64/R/etc/Renviron")
> sc <- spark_connect(master = "yarn-client",version = "1.6.0", config = list
(default = list(spark.yarn.keytab = "/home/donghua/donghua.keytab", spark.yarn.principal = "donghua@DBAGLOBE.COM")))
> sc
$master
[1] "yarn-client"

$method
[1] "shell"

$app_name
[1] "sparklyr"

$config
$config$default
$config$default$spark.yarn.keytab
[1] "/home/donghua/donghua.keytab"

$config$default$spark.yarn.principal
[1] "donghua@DBAGLOBE.COM"



$spark_home
[1] "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark"

$backend
A connection with                               
description "->localhost:46015"
class       "sockconn"         
mode        "wb"               
text        "binary"           
opened      "opened"           
can read    "yes"              
can write   "yes"              

$monitor
A connection with                              
description "->localhost:8880"
class       "sockconn"        
mode        "rb"              
text        "binary"          
opened      "opened"          
can read    "yes"             
can write   "yes"             

$output_file
[1] "/tmp/RtmpXWaXfE/file7af1ca61a03_spark.log"

$spark_context
<jobj[6]>
  class org.apache.spark.SparkContext
  org.apache.spark.SparkContext@355d7d99

$java_context
<jobj[7]>
  class org.apache.spark.api.java.JavaSparkContext
  org.apache.spark.api.java.JavaSparkContext@ef616c5

attr(,"class")
[1] "spark_connection"       "spark_shell_connection" "DBIConnection"   


image

> library(DBI)
> iotdatademo <- dbGetQuery(sc, 'Select * from default.iotdatademo limit 10')
> iotdatademo

image

Reference URL: https://medium.com/@bkvarda/sparklyr-r-interface-for-spark-and-kerberos-on-cloudera-80abf5f6b4ad

Common errors and fixes with Spark 1.6 running with Python3 (Anaconda Version)

1. Error caused by Python 3.6 (version too new)

[donghua@cdh-vm spark]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.6.3 |Anaconda, Inc.| (default, Oct 13 2017, 12:02:49)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
[TerminalIPythonApp] WARNING | Unknown error in handling PYTHONSTARTUP file /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/shell.py:


TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module'

How to fix:

[donghua@cdh-vm spark]$ conda create -n py35 python=3.5 anaconda

[donghua@cdh-vm spark]$ source activate py35

2. Error caused by worker using different version comparing to pyspark

(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.


In [9]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
   ...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:>                                                          (0 + 2) / 2]

18/01/03 08:22:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh-vm.dbaglobe.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 64, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions

How to fix:

Add line “PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “

3. Error “Randomness of hash of string should be disabled via PYTHONHASHSEED”

In [1]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByK
   ...: ey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey()
[Stage 0:>                                                          (0 + 2) / 2]18/01/03 09:17:09 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, cdh-vm.dbaglobe.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 1703, in add_shuffle_key
    buckets[partitionFunc(k) % numPartitions].append((k, v))
  File "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/pyspark/rdd.py", line 74, in portable_hash
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

How to fix:

Add line “SPARK_YARN_USER_ENV=PYTHONHASHSEED=0” to file “/opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh “

[root@cdh-vm conf]# diff /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh /opt/cloudera/parcels/CDH/lib/spark/conf/spark-env.sh.orig
63,66d62
<
< PYSPARK_PYTHON=/opt/anaconda3/envs/py35/bin/python3
< SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
<
79d74
<

(py35) [donghua@cdh-vm ~]$ pyspark
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark).
WARNING: Running pyspark from user-defined location.
Python 3.5.4 |Anaconda, Inc.| (default, Oct 13 2017, 11:22:58)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 3.5.4 (default, Oct 13 2017 11:22:58)
SparkContext available as sc, HiveContext available as sqlContext.


In [3]: sc.textFile('/user/donghua/IOTDataDemo.csv').filter(lambda line: line[0:9] != "StationID").map(lambda line: (line.split(",")[3],(float(line.split(",")[4]),1))).reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey().collect()

Out[3]:
[('0', 80.42217204861151),
  ('1', 80.42420773058639),
  ('2', 80.516892013888),
  ('3', 80.42997673611161),
  ('4', 80.62740798611237),

  ('5', 80.49621712962933),
  ('6', 80.5453983217595)]