Saturday, March 9, 2019

Query Kafka topic directly using hive-kafka-storagehandler

1. Create kafka topic

[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper hdp:2181 --replication-factor 1 --partitions 1 --topic kafka_hive_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "kafka_hive_topic".

2. Create hive table

[donghua@hdp ~]$ beeline -u "jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n donghua -p x
Connecting to jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
19/03/09 16:44:13 [main]: INFO jdbc.HiveConnection: Connected to hdp:10000
Connected to: Apache Hive (version 3.1.0.3.1.0.0-78)
Driver: Hive JDBC (version 3.1.0.3.1.0.0-78)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.0.3.1.0.0-78 by Apache Hive
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> 

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> CREATE EXTERNAL TABLE kafka_hive_table
. . . . . . . . . . . . . . . . . . . . . . >   (`Country Name` string , `Language` string,  `_id` struct<`$oid`:string>)
. . . . . . . . . . . . . . . . . . . . . . >   STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
. . . . . . . . . . . . . . . . . . . . . . >   TBLPROPERTIES
. . . . . . . . . . . . . . . . . . . . . . >   ("kafka.topic" = "kafka_hive_topic", "kafka.bootstrap.servers"="hdp:6667");
No rows affected (4.747 seconds)

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> desc kafka_hive_table;
+---------------+----------------------+--------------------+
|   col_name    |      data_type       |      comment       |
+---------------+----------------------+--------------------+
| country name  | string               | from deserializer  |
| language      | string               | from deserializer  |
| _id           | struct<$oid:string>  | from deserializer  |
| __key         | binary               | from deserializer  |
| __partition   | int                  | from deserializer  |
| __offset      | bigint               | from deserializer  |
| __timestamp   | bigint               | from deserializer  |
+---------------+----------------------+--------------------+
7 rows selected (0.359 seconds)

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !outputformat tsv2

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !brief
verbose: off

createtab_stmt
CREATE EXTERNAL TABLE `kafka_hive_table`(
  `country name` string COMMENT 'from deserializer', 
  `language` string COMMENT 'from deserializer', 
  `_id` struct<$oid:string> COMMENT 'from deserializer', 
  `__key` binary COMMENT 'from deserializer', 
  `__partition` int COMMENT 'from deserializer', 
  `__offset` bigint COMMENT 'from deserializer', 
  `__timestamp` bigint COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.kafka.KafkaSerDe' 
STORED BY 
  'org.apache.hadoop.hive.kafka.KafkaStorageHandler' 
WITH SERDEPROPERTIES ( 
  'serialization.format'='1')
LOCATION
  'hdfs://hdp.dbaglobe.com:8020/warehouse/tablespace/external/hive/demodb.db/kafka_hive_table'
TBLPROPERTIES (
  'bucketing_version'='2', 
  'hive.kafka.max.retries'='6', 
  'hive.kafka.metadata.poll.timeout.ms'='30000', 
  'hive.kafka.optimistic.commit'='false', 
  'hive.kafka.poll.timeout.ms'='5000', 
  'kafka.bootstrap.servers'='hdp:6667', 
  'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe', 
  'kafka.topic'='kafka_hive_topic', 
  'kafka.write.semantic'='AT_LEAST_ONCE', 
  'transient_lastDdlTime'='1552121132')
27 rows selected (0.109 seconds)
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> 

3. Ingest some data into Kafka topic

[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list hdp:6667 --topic kafka_hive_topic
>{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
>{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
>^C
[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic kafka_hive_topic --bootstrap-server hdp:6667 --from-beginning
{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
^C
Processed a total of 2 messages

4. Query hive table

0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t;
INFO  : Compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:name, type:string, comment:null), FieldSchema(name:lang, type:string, comment:null), FieldSchema(name:t.__offset, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.289 seconds
INFO  : Executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO  : Completed executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.006 seconds
INFO  : OK
+----------+-------+-------------+
|   name   | lang  | t.__offset  |
+----------+-------+-------------+
| Afrika   | af    | 12          |
| Oseanië  | af    | 13          |
+----------+-------+-------------+
2 rows selected (0.379 seconds)

Additional Finding

if empty messages inside Kafka topic, the hive result could be wrong, as below:
offset 3 and 4 are empty string



Row 3 and 4 are incorrect, which repeat data from row 2.





1 comment: