Flink Client 使用技巧和心得(Flink on Zeppelin)
Posted Vics异地我就
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Client 使用技巧和心得(Flink on Zeppelin)相关的知识,希望对你有一定的参考价值。
Flink 链接Kafka
先建立catalog
CREATE CATALOG flink_hive WITH (
'type' = 'hive',
'default-database' = 'imods',
'hive-conf-dir' = '/home/admin/flink/conf'
);
建立kafka table
use catalog flink_hive;
--创建kafka源表
CREATE TABLE IF NOT EXISTS kafka_table (
vin string,
age int,
...
)--with 写入链接信息以及各种设置
WITH (
'connector' = 'kafka',
'topic' = '自定义的topic',
'properties.group.id' = '自定义的id',
'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码
'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
--'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明
--'scan.startup.mode' = 'earliest-offset',
'scan.startup.mode' = 'group-offsets',
'json.fail-on-missing-field' = 'false',--是否允许失败策略
'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略
'format' = 'json'--传入的格式
);
group-offsets
: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset
: start from the earliest offset possible. 从最初点开始消费
latest-offset
: start from the latest offset.程序运行时有新消息才消费新消息
timestamp
: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets
: start from user-supplied specific offsets for each partition.
指定位置进行消费
建立对应的hivetable
--创建HIVE目标表
set table.sql-dialect=hive;
create table if not exists hive_table --table字段类型顺序务必与Kafkatable一致,严格要求
(
vin string,
age int,
...
)
comment '我是Hive表'
partitioned by (dt string) --option
stored as parquet --option
TBLPROPERTIES (
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='30 min',
'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项
'auto-compaction'='true',
'compaction.file-size'='128MB',
'sink.shuffle-by-partition.enable'='true'
)
;
--执行insert语句动态分区插入
set pipeline.name=设定英文任务名; -- 设定英文任务名 不需要加引号
set table.sql-dialect=default;
insert into hive_table
select
vin string as vin,
age int as age,
...
from kafka_table;
--记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end as dt
以上是关于Flink Client 使用技巧和心得(Flink on Zeppelin)的主要内容,如果未能解决你的问题,请参考以下文章