pyflink消费kafka-connect-jdbc消息(带schema)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyflink消费kafka-connect-jdbc消息(带schema)相关的知识,希望对你有一定的参考价值。
参考技术A 1、数据接入通过kafka的restFul接口创建连接mysql的连接器并启动。
"name": "mysql_stream_test",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
2.kafka-connect创建主题中的默认数据格式为
"schema":"type":"struct","fields":["type":"int32","optional":false,"field":"ID","type":"string","optional":false,"field":"NAME","type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"],"optional":false,"name":"test_demo","payload":"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000
3.使用pyflink消费带schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema为字符串, 用 ROW 函数封装 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行数据'
)comment '从kafkaConnect获取带模式的数据'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.执行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.执行结果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,执行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
以上是关于pyflink消费kafka-connect-jdbc消息(带schema)的主要内容,如果未能解决你的问题,请参考以下文章
PyFlink 教程:PyFlink DataStream API - state & timer
PyFlink 教程:PyFlink DataStream API - state & timer
PyFlink 开发环境利器:Zeppelin Notebook