kafka flink es hive streaming
Posted 秉寒-CHO
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka flink es hive streaming相关的知识,希望对你有一定的参考价值。
1.kafka create topic
kafka-topics.sh --create --bootstrap-server 172.19.68.12:9092 --replication-factor 1 --partitions 1 --topic dev-yuqing-topic-test
2.kafka producer
kafka-console-producer.sh --broker-list 172.19.68.12.9092 --topic dev-yuqing-topic-test
--num-records 5 --records-size 128000
./kafka-producer-perf-test.sh --num-records 500000 --record-size 3000 --topic test-rep-lww --producer-props bootstrap.servers=kafka-01:19022,kafka-02:19023,kafka-03:19024
3.kafka consumer
kafka-console-consumer.sh --bootstrap-server 172.19.68.12:9092 --topic dev-yuqing-topic-test
4.flink source
create table datagen_50 (
i int ,
ts as localtimestamp ,
watermark for ts as ts
)with (
'connector'='datagen',
'rows-per-second'='5',
'fields.i.kind'='sequence',
'fields.i.start'='1',
'fields.i.end'='500000'
)
5.hive table
set table.sql-dialect=hive ;
create table tmp.shalem(
i int
)partitioned by(ts string)
tblproperties(
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.success-file.name'='_SUCCESS', --干什么用的
'sink.partition-commit.delay'='15 s',
'sink.partition-commit.trigger'='partition-time', --process-time
'auto-compaction'='true', --线程间数据的合并
'compaction.file-size'='128M',
'sink.rolling-policy.rollover-interval'='10 min', --零文件可以保持打开的最大事件
'sink.rolling-policy.check-interval'='2 min' --零文件检查时间
)
6. kafka write into hive
insert into tmp.shalem select i ,date_format(ts,'yyyy-MM-dd')from datagen_50;
7.kafka source
create table kafka_shalem(
i int ,
ts timestamp(3),
proctime as PROCTIME()
)with(
'connector'='kafka',
'topic'='dev-yuqing-topic-test',
'properites.bootstrap.servers'='172.19.68.12:9092',
'properites.group.id'='perf-consumer-12',
'scan.startup.mode'='earlist-offset',
'format'='csv' --'json' --'raw'
)
8.kafka data join hive dim table
select * from
kafka_shalem t1
left join
tmp.shalem
/*+ OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='1 min','streaming-source.partition-order'='partition-name')*/
for system_time as of t1.proctime as t2
on t1.i = t2.i
9.sink es
create table person_dup_es_sink (
id VARCHAR,
data VARCHAR
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts.0.hostname' = 'remote host',
'connector.hosts.0.port' = '9200',
'connector.hosts.0.protocol' = 'http',
'connector.index' = 'test_index',
'connector.document-type' = 'person',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{"type":"object", "properties": {"id": {"type": "string"}, "data":{"type":"string"}}}'
)
;
10.kafka-sql
11.es-sql 需要xpack 插件
cat es-sql
#!/bin/sh
/home/es/elasticsearch-7.4.2/bin/elasticsearch-sql-cli url=http://user:passwd@ip:9200
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,
ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
JOIN hive_catalog.flink_db.dim_extend_shop_info
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h','streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
where groupID in (202042)) t where t.rn = 1
以上是关于kafka flink es hive streaming的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 读取 Hive 数据同步到 Kafka
Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)并且自动同步数据到 Hive