pyflink消费kafka-connect-jdbc消息(带schema)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyflink消费kafka-connect-jdbc消息(带schema)相关的知识,希望对你有一定的参考价值。

参考技术A 1、数据接入

通过kafka的restFul接口创建连接mysql的连接器并启动。



    "name": "mysql_stream_test",

    "config":

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

   



2.kafka-connect创建主题中的默认数据格式为

"schema":"type":"struct","fields":["type":"int32","optional":false,"field":"ID","type":"string","optional":false,"field":"NAME","type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"],"optional":false,"name":"test_demo","payload":"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000

3.使用pyflink消费带schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema为字符串, 用 ROW 函数封装 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行数据'

)comment '从kafkaConnect获取带模式的数据'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.执行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.执行结果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,执行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

以上是关于pyflink消费kafka-connect-jdbc消息(带schema)的主要内容,如果未能解决你的问题,请参考以下文章

PyFlink 教程:PyFlink DataStream API - state & timer

PyFlink 教程:PyFlink DataStream API - state & timer

Pyflink 本地开发与测试

PyFlink 开发环境利器:Zeppelin Notebook

PyFlink 开发环境利器:Zeppelin Notebook

flink系列-11PyFlink 核心功能介绍(整理自 Flink 中文社区)