Saturday, May 26, 2018

ntp configuration to avoid KUDU ntptime error in lab/poc envronment

[root@cdh01 ~]# tail -n 5 /etc/ntp.conf
# Undisciplined Local Clock. This is a fake driver intended for backup
# and when no outside source of synchronized time is available.
server  127.127.1.0     # local clock
fudge   127.127.1.0 stratum 10


[root@cdh01 ~]# ntptime
ntp_gettime() returns code 0 (OK)
  time deb2b920.6f031890  Sat, May 26 2018  0:32:32.433, (.433641853),
  maximum error 133377 us, estimated error 527 us, TAI offset 0
ntp_adjtime() returns code 0 (OK)
  modes 0x0 (),
  offset 0.000 us, frequency -12.739 ppm, interval 1 s,
  maximum error 133377 us, estimated error 527 us,
  status 0x2000 (NANO),
  time constant 2, precision 0.001 us, tolerance 500 ppm,

Friday, May 25, 2018

Kudu partition example

create table kudu_partition
(  id string,
load_ts timestamp,
col1 string,
col2 string,
col3 string, 
primary key (id,load_ts))
partition by hash (id) partitions 20,
range (load_ts) 
( partition '2018-05-01' <= values < '2018-06-01',
    partition '2018-06-01' <= values < '2018-07-01',
    partition '2018-07-01' <= values < '2018-08-01',
    partition '2018-08-01' <= values < '2018-09-01',
partition '2018-09-01' <= values < '2018-10-01',
    partition '2018-10-01' <= values < '2018-11-01',
    partition '2018-11-01' <= values < '2018-12-01',
    partition '2018-12-01' <= values < '2019-01-01',
    partition '2019-01-01' <= values < '2019-02-01',
    partition '2019-02-01' <= values < '2019-03-01',
    partition '2019-03-01' <= values < '2019-04-01',
    partition '2019-04-01' <= values < '2019-05-01'
)stored as kudu;


insert into kudu_partition values('abc1',now(),'a','b','c');
insert into kudu_partition values('abc2','2018-05-01','a','b','c');
insert into kudu_partition values('abc3','2018-05-05 21:03:05','a','b','c');

alter table kudu_partition add range partition '2019-05-01' <= values < '2019-06-01';
alter table kudu_partition drop range partition '2018-05-01' <= values < '2018-06-01';


[cdh-vm.dbaglobe.com:21000] > show range partitions kudu_partition;
Query: show range partitions kudu_partition
+---------------------------------------------------------------------+
| RANGE (load_ts)                                                     |
+---------------------------------------------------------------------+
| 2018-06-01T00:00:00.000000Z <= VALUES < 2018-07-01T00:00:00.000000Z |
| 2018-07-01T00:00:00.000000Z <= VALUES < 2018-08-01T00:00:00.000000Z |
| 2018-08-01T00:00:00.000000Z <= VALUES < 2018-09-01T00:00:00.000000Z |
| 2018-09-01T00:00:00.000000Z <= VALUES < 2018-10-01T00:00:00.000000Z |
| 2018-10-01T00:00:00.000000Z <= VALUES < 2018-11-01T00:00:00.000000Z |
| 2018-11-01T00:00:00.000000Z <= VALUES < 2018-12-01T00:00:00.000000Z |
| 2018-12-01T00:00:00.000000Z <= VALUES < 2019-01-01T00:00:00.000000Z |
| 2019-01-01T00:00:00.000000Z <= VALUES < 2019-02-01T00:00:00.000000Z |
| 2019-02-01T00:00:00.000000Z <= VALUES < 2019-03-01T00:00:00.000000Z |
| 2019-03-01T00:00:00.000000Z <= VALUES < 2019-04-01T00:00:00.000000Z |
| 2019-04-01T00:00:00.000000Z <= VALUES < 2019-05-01T00:00:00.000000Z |
| 2019-05-01T00:00:00.000000Z <= VALUES < 2019-06-01T00:00:00.000000Z |
+---------------------------------------------------------------------+

select  *from kudu_partition;

Monday, May 21, 2018

Kafka hands-on with Docker

Reference URL: https://github.com/Landoop/fast-data-dev

docker run --rm --net=host -e ADV_HOST=192.168.1.86 landoop/fast-data-dev

# Start docker (change IP address to your Linux IP if external access needed, otherwise leave it to 127.0.0.1)
docker run --rm -it -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -e ADV_HOST=192.168.1.86 landoop/fast-data-dev

# Login the docker
docker run --rm -it --net=host landoop/fast-data-dev bash

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "first_topic".

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --list
__consumer_offsets
_schemas
backblaze_smart
connect-configs
connect-offsets
connect-statuses
coyote-test-avro
coyote-test-binary
coyote-test-json
first_topic
logs_broker
nyc_yellow_taxi_trip_data
reddit_posts
sea_vessel_position_reports
telecom_italia_data
telecom_italia_grid

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --create --topic second_topic --partitions 3 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "second_topic".
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --delete --topic second_topic
Topic second_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --describe --topic first_topic
Topic:first_topic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: first_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: first_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0

root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
>first message
>second message
>^C


root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic

root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --partition 0

# named consumer group commits its position in kafka hidden queue
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --consumer-property group.id=mygroup1 --from-beginning
second message
test
first message
test
test2
^CProcessed a total of 5 messages
# it will not read old message consumed before
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --consumer-property group.id=mygroup1 --from-beginning







Friday, May 11, 2018

/etc/init.d/ambari-server start REASON: Server not yet listening on http port 8080 after 50 seconds. Exiting.

.
The following error just indicates that your Ambari Server is taking salightly more than default 50 Seconds time to open port 8080 and hence you see this message.

ERROR: Exiting with exit code 1.
REASON: Server not yet listening on http port 8080 after 50 seconds. Exiting.



You should try the following to fix this:

Edit the "/etc/ambari-server/conf/ambari.properties" and increase the following property value to 120 or 150 seconds.

server.startup.web.timeout=120 

Then restart the ambari-server again.

# ambari-server restart

Tuesday, May 8, 2018

flatMap & flatMapValues explained in example

In [101]: rdd = sc.parallelize([2, 3, 4])

In [102]: rdd.map(lambda x: range(1, x)).collect()
Out[102]: [[1], [1, 2], [1, 2, 3]]                                             

In [103]: rdd.flatMap(lambda x: range(1, x)).collect()
Out[103]: [1, 1, 2, 1, 2, 3]


In [104]: x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])

In [106]: x.flatMapValues(lambda value:value).collect()
Out[106]: [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 

Monday, May 7, 2018

Read CSV data into Spark (RDD and DataFrame comparatively)


# Sample data
[donghua@cdh-vm data]$ hdfs dfs -cat /data/salesmen.csv
Date,Salesman,Revenue
1/11/16,Bob,1053
1/12/16,Bob,4362
1/13/16,Bob,6812

Method 1: Using RDD directly

from pyspark.sql.types import *
from datetime import datetime


salesmanSchema = StructType(
[
StructField("Date",DateType()),
StructField("Salesman",StringType()),
StructField("Revenue",IntegerType())
])


salesmanRDD = sc.textFile('/data/salesmen.csv')

header = salesmanRDD.first()

dataRDD = salesmanRDD.filter(lambda line: line <> header)

salesmanSchemaRDD=dataRDD.map(lambda line: line.split(',')).\
map(lambda values:[datetime(2000+int(values[0].split('/')[2]),int(values[0].split('/')[0]),int(values[0].split('/')[1])),values[1],int(values[2])])

salesmanDF1=spark.createDataFrame(salesmanSchemaRDD,salesmanSchema)

salesmanDF1.show(5)


In [55]: salesmanDF1.show(5)
+----------+--------+-------+
|      Date|Salesman|Revenue|
+----------+--------+-------+
|2016-01-01|     Bob|   7172|
|2016-01-02|     Bob|   6362|
|2016-01-03|     Bob|   5982|
|2016-01-04|     Bob|   7917|
|2016-01-05|     Bob|   7837|
+----------+--------+-------+
only showing top 5 rows

Method 2: Using DataFrame with predefined scheme directly

from pyspark.sql.types import *

salesmanSchema = StructType(
[
StructField("Date",DateType()),
StructField("Salesman",StringType()),
StructField("Revenue",IntegerType())
])

In [59]: salesmanDF2 = spark.read.schema(salesmanSchema).csv('/data/salesmen.csv',header=True,dateFormat='MM/dd/yy')

In [60]: salesmanDF2.show(5)
+----------+--------+-------+
|      Date|Salesman|Revenue|
+----------+--------+-------+
|2016-01-01|     Bob|   7172|
|2016-01-02|     Bob|   6362|
|2016-01-03|     Bob|   5982|
|2016-01-04|     Bob|   7917|
|2016-01-05|     Bob|   7837|
+----------+--------+-------+
only showing top 5 rows


Tuesday, May 1, 2018

Elastic Search Example: Part 4

Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ curl -XGET  -u elastic:elastic 'localhost:9200/_cat/indices/blogs?v'
health status index uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   blogs NWKFGM2GQe-fS481ZI7EuA   5   1          3            0     16.4kb         16.4kb

Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ curl -XGET  -u elastic:elastic 'localhost:9200/blogs/_settings?pretty'
{
  "blogs" : {
    "settings" : {
      "index" : {
        "creation_date" : "1525127903277",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "NWKFGM2GQe-fS481ZI7EuA",
        "version" : {
          "created" : "6020499"
        },
        "provided_name" : "blogs"
      }
    }
  }
}

Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ curl -XPUT  -H 'Content-Type: application/json' -u elastic:elastic 'localhost:9200/blogs/_settings?pretty' -d '{
>   "index" : {
>     "number_of_replicas" : 0
>   }
> }'
{
  "acknowledged" : true
}

Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ curl -XGET  -u elastic:elastic 'localhost:9200/blogs/_settings?pretty'
{
  "blogs" : {
    "settings" : {
      "index" : {
        "creation_date" : "1525127903277",
        "number_of_shards" : "5",
        "number_of_replicas" : "0",
        "uuid" : "NWKFGM2GQe-fS481ZI7EuA",
        "version" : {
          "created" : "6020499"
        },
        "provided_name" : "blogs"
      }
    }
  }
}
Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ curl -XGET  -u elastic:elastic 'localhost:9200/_cat/indices/blogs?v'
health status index uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   blogs NWKFGM2GQe-fS481ZI7EuA   5   0          3            0     16.4kb         16.4kb
Donghuas-MacBook-Air:elasticsearch-6.2.4 donghua$ 


Symptoms if "index.number_of_replicas: 0" in "config/elasticsearch.yml"

Found index level settings on node level configuration.

Since elasticsearch 5.x index level settings can NOT be set on the nodes 
configuration like the elasticsearch.yaml, in system properties or command line 
arguments.In order to upgrade all indices the settings must be updated via the 
/${index}/_settings API. Unless all settings are dynamic all indices must be closed 
in order to apply the upgradeIndices created in the future should use index templates 
to set default values. 

Please ensure all required values are updated on all indices by executing: 

curl -XPUT 'http://localhost:9200/_all/_settings?preserve_existing=true' -d '{
  "index.number_of_replicas" : "0"
}'
*************************************************************************************

[2018-05-01T08:21:55,016][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.IllegalArgumentException: node settings must not contain any index level settings
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:125) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.Command.main(Command.java:90) ~[elasticsearch-cli-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:85) ~[elasticsearch-6.2.4.jar:6.2.4]
Caused by: java.lang.IllegalArgumentException: node settings must not contain any index level settings
at org.elasticsearch.common.settings.SettingsModule.(SettingsModule.java:128) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.node.Node.(Node.java:331) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.node.Node.(Node.java:246) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Bootstrap$5.(Bootstrap.java:213) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:213) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:323) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:121) ~[elasticsearch-6.2.4.jar:6.2.4]

... 6 more