kafka flink es hive streaming

Posted 秉寒-CHO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka flink es hive streaming相关的知识,希望对你有一定的参考价值。

1.kafka create topic  
kafka-topics.sh --create --bootstrap-server 172.19.68.12:9092 --replication-factor 1 --partitions 1 --topic dev-yuqing-topic-test
2.kafka producer 
kafka-console-producer.sh --broker-list 172.19.68.12.9092 --topic dev-yuqing-topic-test
--num-records 5 --records-size 128000
./kafka-producer-perf-test.sh --num-records 500000 --record-size 3000 --topic test-rep-lww  --producer-props bootstrap.servers=kafka-01:19022,kafka-02:19023,kafka-03:19024

3.kafka consumer
kafka-console-consumer.sh --bootstrap-server 172.19.68.12:9092 --topic dev-yuqing-topic-test 
4.flink source 
create table datagen_50 (
i int ,
ts as localtimestamp ,
watermark for ts as ts 
)with (
'connector'='datagen',
'rows-per-second'='5',
'fields.i.kind'='sequence',
'fields.i.start'='1',
'fields.i.end'='500000'
)
5.hive table 
set table.sql-dialect=hive ;
create table tmp.shalem(
i int 
)partitioned by(ts string)
tblproperties(
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.success-file.name'='_SUCCESS',  --干什么用的
'sink.partition-commit.delay'='15 s',
'sink.partition-commit.trigger'='partition-time', --process-time
'auto-compaction'='true',      --线程间数据的合并
'compaction.file-size'='128M',
'sink.rolling-policy.rollover-interval'='10 min', --零文件可以保持打开的最大事件
'sink.rolling-policy.check-interval'='2 min'      --零文件检查时间
)
6. kafka write into hive 
insert into tmp.shalem select i ,date_format(ts,'yyyy-MM-dd')from datagen_50;
7.kafka source 
create table kafka_shalem(
i int ,
ts timestamp(3),
proctime as PROCTIME()
)with(
'connector'='kafka',
'topic'='dev-yuqing-topic-test',
'properites.bootstrap.servers'='172.19.68.12:9092',
'properites.group.id'='perf-consumer-12',
'scan.startup.mode'='earlist-offset',
'format'='csv'  --'json'  --'raw'
)
8.kafka data join hive dim table 
select * from 
kafka_shalem t1 
left join 
tmp.shalem 
/*+ OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='1 min','streaming-source.partition-order'='partition-name')*/
 for system_time as of t1.proctime as t2
on t1.i = t2.i 
9.sink es 
create table person_dup_es_sink (
    id          VARCHAR,
    data        VARCHAR
)  with (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts.0.hostname' = 'remote host',
    'connector.hosts.0.port' = '9200',
    'connector.hosts.0.protocol' = 'http',
    'connector.index' = 'test_index',
    'connector.document-type' = 'person',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.json-schema' = '"type":"object", "properties": "id": "type": "string", "data":"type":"string"'
)
;

10.kafka-sql 

11.es-sql 需要xpack 插件
cat es-sql 
#!/bin/sh 
/home/es/elasticsearch-7.4.2/bin/elasticsearch-sql-cli url=http://user:passwd@ip:9200  





CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id, 
     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
       JOIN hive_catalog.flink_db.dim_extend_shop_info 
     /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest',
    'streaming-source.monitor-interval' = '1 h','streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
    where groupID in (202042)) t  where t.rn = 1

以上是关于kafka flink es hive streaming的主要内容,如果未能解决你的问题,请参考以下文章

kafka flink es hive streaming

kafka flink es hive streaming

Flink实战系列Flink 读取 Hive 数据同步到 Kafka

Flink实战之Kafka To Hive

Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)并且自动同步数据到 Hive

基于Flink的实时计算平台的构建