Python3.6.9 Flink 1.15.2消费Kafaka Topic
Posted ShenLiang2025
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3.6.9 Flink 1.15.2消费Kafaka Topic相关的知识,希望对你有一定的参考价值。
查看Linux发行版和内核版本
root@shenliang-VirtualBox:~# cat /proc/version
Linux version 5.4.0-135-generic (buildd@lcy02-amd64-053) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)) #152~18.04.2-Ubuntu SMP Tue Nov 29 08:23:49 UTC 2022
注:pyflink 1.15.2对python3的版本有要求,当前验证环境为python 3.6.9。
安装Pyflink 1.15.2
pip3 install apache-flink==1.15.2 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
下载Kafka
启动Zookeeper
进入kafka主目录,在该目录内启动zookeeper。
bin/zookeeper-server-start.sh config/zookeeper.properties
注:可以通过nohup命令使其在后台运行,以下执行的命令类似。
启动kafka
bin/kafka-server-start.sh config/server.properties
创建Topic主题
通过virtualenv命令创建虚拟目录
bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic flink_kafakasource
查看当前创建的Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
获取kafka的版本
find ./libs/ -name \\*kafka_\\* | head -1 | grep -o '\\kafka[^\\n]*
从结果可以看出scala的版本是2.12,kafka的版本是2.8.2
配置Flink Kafka连接
在https://mvnrepository.com/里输入 flink kafka寻找对应版本的连接器。
选择Flink对应的版本1.15.2,点击jar,分别下载flink-connector-base和kafka-clients对应的jar包。
将该jar包放置在python的lib目录下。
/usr/local/lib/python3.6/dist-packages/pyflink/lib
注:
flink-connector-kafka-1.15.2.jar
kafka-clients-2.8.1.jar
flink-connector-base-1.15.2.jar
编写并运行Flink程序
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)#, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with(
'connector'='kafka',
'topic'='flink_kafakasource',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='latest-offset',
'format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.INT()
, DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames
, fieldTypes, "/root/tiamaes/result.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
st_env.execute_sql("""
INSERT INTO csvTableSink
select * from sourceKafka
""").wait()
#执行程序
python flinkdemo.py
打开kafka生产者
打开kafka生产者,通过客户端生产数据。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_kafakasource
查看Flink侧结果
以上是关于Python3.6.9 Flink 1.15.2消费Kafaka Topic的主要内容,如果未能解决你的问题,请参考以下文章
Python3.6.9 Flink 1.15.2消费Kafaka Topic
centos7.6下的python3.6.9虚拟环境安装elastalert
Apache Superset从入门到放弃(基于python3.6.9)
Apache Superset从入门到放弃(基于python3.6.9)