使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由相关的知识,希望对你有一定的参考价值。
在很多场景下,需要将数据同步其他数据源进行计算,本文介绍通过debezium和kafka-connect将postgres数据同步到kafka中。
首先下载debezium,官网地址: https://debezium.io/
目前稳定版本是1.9.5,这是postgres对应的kafka-connect插件下载地址:
debezium-connector-postgres-1.9.5.Final-plugin.tar.gz
然后再kafka的目录下,新建一个plugins目录,将下载的包解压到这个目录:
然后我们配置kafka-coinect,我们这里以集群模式为例:
vim config/connect-distributed.properties
主要调整如下几个地方:
# kafka集群地址
bootstrap.servers=node1:9092,node2:9092,node3:9092
# kafka-connect插件位置
plugin.path=/data1/service/kafka-2.7.2/plugins
#可以适当调整下面这个值,默认是 10000
offset.flush.timeout.ms=100000
然后我们就可以启动kafka-connect:
bin/connect-distributed.sh -daemon config/connect-distributed.properties
到这里,kafka-connect,就启动了起来。
kafka-connect提供了http restfule的接口供我们取操作,默认的端口地址是8083
,常见如下:
GET /connector-plugins | 获取当前所有插件名称 |
GET /connectors | 获取当前所有connector |
POST /connector | 添加一个connector |
GET /connectors/name | 获取指定的connector的信息 |
GET /connectors/name/config | 获取指定的connector的配置信息 |
PUT /connectors/name/config | 更新connector的配置 |
GET /connectors/name/status | 获取指定connector的装填 |
GET /connectors/name/tasks/ | 获取指定connector正在运行的task |
GET /connectors/name/tasks/tasks/taskid/status | 获取connector的task状态信息 |
PUT /connectors/name/pause | 暂停connector和他运行task |
POST /connectors/name/restart | 重启connector |
POST /connectors/name/tasks/taskid/restart | 重启一个connector的task |
DELETE /connectors/.name | 删除一个connector,停止关联的task并删除配置 |
在psotgres数据库侧,我们需要调整一下参数:
wal_level = logical
max_wal_senders = 2000
max_replication_slots = 2000
#下面两个参数可以根据需要调整
wal_sender_timeout = 60s
wal_receiver_timeout = 60s
配置完之后需要重启。
另外一点,如果PG版本比较老的话,需要装
- decoderbufs(由Debezium社区维护,基于ProtoBuf)
- wal2json(由wal2json社区维护,基于JSON)
而在PG10+默认自带pgoutput
可以不用安装,我这里的是基于PG12,所以不用安装。
到这里所有准备工作就做好了,接下来就是想kafka-connect中添加connector了:
"name": "prod-material-642",
"config":
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "ad_dissectorprofile_000642",
"database.user": "postgres",
"slot.name": "prodmaterial642",
"tasks.max": "1",
"database.hostname": "localhost",
"database.password": "postgres",
"name": "prod-material-642",
"database.server.name": "prod-material-642",
"database.port": "5432",
"plugin.name": "pgoutput",
"table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity"
这里添加完之后,我们可以查看天剑的connector和其对应的任务,然后再kafka中,会生成如下几个topic:
prod-material-642.public.ad_entity
prod-material-642.public.campaign
prod-material-642.public.media_entity
prod-material-642.public.schedule_entity
prod-material-642.public.url_scheme
而写入到topic中的数据内容大致如下:
"before":null,
"after":
"pk":"1",
"value":"New data"
,
"source":
...
"snapshot":"incremental"
,
"op":"r",
"ts_ms":"1620393591654",
"transaction":null
如果需要详细内容,可以看下debezium官网上对于postgres同步的详细介绍:
Debezium connector for PostgreSQL
后续我们就可以消费对应的topic来进行相关的数据同步处理即可
另外,需要注意的是,默认情况下,这里debezium是针对每个库、每个schema、每个表都生成一个topic,如果表比较多,那么topic数量将会特别多,为此debezium可以对写入的topic进行重新路由:
"name": "prod-material-642",
"config":
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "ad_dissectorprofile_000642",
"database.user": "postgres",
"slot.name": "prodmaterial642",
"tasks.max": "1",
"database.hostname": "localhost",
"database.password": "postgres",
"name": "prod-material-642",
"database.server.name": "prod-material-642",
"database.port": "5432",
"plugin.name": "pgoutput",
"table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity",
"transforms": "Combine",
"transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Combine.topic.regex": "(.*)",
"transforms.Combine.topic.replacement": "prod-material-sync"
如上,这样会将所有的同步数据都写入到prod-material-sync
这一个topic中去。详细可以参考官网:
topic-routing
以上是关于使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 stat_cor() 函数将大写字母 P 的 p 值传递给 ggplot 对象?
使用或不使用 OpenSSL 将 SSL .pem 转换为 .p12