Saturday, March 9, 2019

Query Kafka topic directly using hive-kafka-storagehandler

1. Create kafka topic

[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper hdp:2181 --replication-factor 1 --partitions 1 --topic kafka_hive_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "kafka_hive_topic".

2. Create hive table

[donghua@hdp ~]$ beeline -u "jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n donghua -p x
Connecting to jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
19/03/09 16:44:13 [main]: INFO jdbc.HiveConnection: Connected to hdp:10000
Connected to: Apache Hive (version 3.1.0.3.1.0.0-78)
Driver: Hive JDBC (version 3.1.0.3.1.0.0-78)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.0.3.1.0.0-78 by Apache Hive
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> 

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> CREATE EXTERNAL TABLE kafka_hive_table
. . . . . . . . . . . . . . . . . . . . . . >   (`Country Name` string , `Language` string,  `_id` struct<`$oid`:string>)
. . . . . . . . . . . . . . . . . . . . . . >   STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
. . . . . . . . . . . . . . . . . . . . . . >   TBLPROPERTIES
. . . . . . . . . . . . . . . . . . . . . . >   ("kafka.topic" = "kafka_hive_topic", "kafka.bootstrap.servers"="hdp:6667");
No rows affected (4.747 seconds)

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> desc kafka_hive_table;
+---------------+----------------------+--------------------+
|   col_name    |      data_type       |      comment       |
+---------------+----------------------+--------------------+
| country name  | string               | from deserializer  |
| language      | string               | from deserializer  |
| _id           | struct<$oid:string>  | from deserializer  |
| __key         | binary               | from deserializer  |
| __partition   | int                  | from deserializer  |
| __offset      | bigint               | from deserializer  |
| __timestamp   | bigint               | from deserializer  |
+---------------+----------------------+--------------------+
7 rows selected (0.359 seconds)

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !outputformat tsv2

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !brief
verbose: off

createtab_stmt
CREATE EXTERNAL TABLE `kafka_hive_table`(
  `country name` string COMMENT 'from deserializer', 
  `language` string COMMENT 'from deserializer', 
  `_id` struct<$oid:string> COMMENT 'from deserializer', 
  `__key` binary COMMENT 'from deserializer', 
  `__partition` int COMMENT 'from deserializer', 
  `__offset` bigint COMMENT 'from deserializer', 
  `__timestamp` bigint COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.kafka.KafkaSerDe' 
STORED BY 
  'org.apache.hadoop.hive.kafka.KafkaStorageHandler' 
WITH SERDEPROPERTIES ( 
  'serialization.format'='1')
LOCATION
  'hdfs://hdp.dbaglobe.com:8020/warehouse/tablespace/external/hive/demodb.db/kafka_hive_table'
TBLPROPERTIES (
  'bucketing_version'='2', 
  'hive.kafka.max.retries'='6', 
  'hive.kafka.metadata.poll.timeout.ms'='30000', 
  'hive.kafka.optimistic.commit'='false', 
  'hive.kafka.poll.timeout.ms'='5000', 
  'kafka.bootstrap.servers'='hdp:6667', 
  'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe', 
  'kafka.topic'='kafka_hive_topic', 
  'kafka.write.semantic'='AT_LEAST_ONCE', 
  'transient_lastDdlTime'='1552121132')
27 rows selected (0.109 seconds)
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> 

3. Ingest some data into Kafka topic

[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list hdp:6667 --topic kafka_hive_topic
>{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
>{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
>^C
[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic kafka_hive_topic --bootstrap-server hdp:6667 --from-beginning
{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
^C
Processed a total of 2 messages

4. Query hive table

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t;
INFO  : Compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:name, type:string, comment:null), FieldSchema(name:lang, type:string, comment:null), FieldSchema(name:t.__offset, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.289 seconds
INFO  : Executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO  : Completed executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.006 seconds
INFO  : OK
+----------+-------+-------------+
|   name   | lang  | t.__offset  |
+----------+-------+-------------+
| Afrika   | af    | 12          |
| Oseanië  | af    | 13          |
+----------+-------+-------------+
2 rows selected (0.379 seconds)

Additional Finding

if empty messages inside Kafka topic, the hive result could be wrong, as below:
offset 3 and 4 are empty string



Row 3 and 4 are incorrect, which repeat data from row 2.





2 comments:

  1. Excellent post. I learned a lot of information from this blog and Its useful for gain my knowledge. Keep blogging
    Apache hive Training in Electronic City

    ReplyDelete
  2. Is there any performance matrix available for this?

    ReplyDelete