使用debeziumkafka-connect将postgres数据实时同步到kafka中

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用debeziumkafka-connect将postgres数据实时同步到kafka中相关的知识,希望对你有一定的参考价值。

在很多场景下,需要将数据同步其他数据源进行计算,本文介绍通过debezium和kafka-connect将postgres数据同步到kafka中。

首先下载debezium,官网地址: https://debezium.io/
目前稳定版本是1.9.5,这是postgres对应的kafka-connect插件下载地址:

debezium-connector-postgres-1.9.5.Final-plugin.tar.gz

然后再kafka的目录下,新建一个plugins目录,将下载的包解压到这个目录:

然后我们配置kafka-coinect,我们这里以集群模式为例:

 vim config/connect-distributed.properties
 

主要调整如下几个地方:

# kafka集群地址
bootstrap.servers=node1:9092,node2:9092,node3:9092
# kafka-connect插件位置
plugin.path=/data1/service/kafka-2.7.2/plugins
#可以适当调整下面这个值,默认是 10000
offset.flush.timeout.ms=100000

然后我们就可以启动kafka-connect:

bin/connect-distributed.sh -daemon config/connect-distributed.properties

到这里,kafka-connect,就启动了起来。
kafka-connect提供了http restfule的接口供我们取操作,默认的端口地址是8083,常见如下:

GET /connector-plugins获取当前所有插件名称
GET /connectors获取当前所有connector
POST /connector添加一个connector
GET /connectors/name获取指定的connector的信息
GET /connectors/name/config获取指定的connector的配置信息
PUT /connectors/name/config更新connector的配置
GET /connectors/name/status获取指定connector的装填
GET /connectors/name/tasks/获取指定connector正在运行的task
GET /connectors/name/tasks/tasks/taskid/status获取connector的task状态信息
PUT /connectors/name/pause暂停connector和他运行task
POST /connectors/name/restart重启connector
POST /connectors/name/tasks/taskid/restart重启一个connector的task
DELETE /connectors/.name删除一个connector,停止关联的task并删除配置

在psotgres数据库侧,我们需要调整一下参数:

wal_level = logical
max_wal_senders = 2000
max_replication_slots = 2000
#下面两个参数可以根据需要调整
wal_sender_timeout = 60s
wal_receiver_timeout = 60s

配置完之后需要重启。
另外一点,如果PG版本比较老的话,需要装

  • decoderbufs(由Debezium社区维护,基于ProtoBuf)
  • wal2json(由wal2json社区维护,基于JSON)

而在PG10+默认自带pgoutput可以不用安装,我这里的是基于PG12,所以不用安装。

到这里所有准备工作就做好了,接下来就是想kafka-connect中添加connector了:


    "name": "prod-material-642",
    "config": 
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "ad_dissectorprofile_000642",
        "database.user": "postgres",
        "slot.name": "prodmaterial642",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.password": "postgres",
        "name": "prod-material-642",
        "database.server.name": "prod-material-642",
        "database.port": "5432",
        "plugin.name": "pgoutput",
        "table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity"
    

这里添加完之后,我们可以查看天剑的connector和其对应的任务,然后再kafka中,会生成如下几个topic:

prod-material-642.public.ad_entity
prod-material-642.public.campaign
prod-material-642.public.media_entity
prod-material-642.public.schedule_entity
prod-material-642.public.url_scheme

而写入到topic中的数据内容大致如下:


    "before":null,
    "after": 
        "pk":"1",
        "value":"New data"
    ,
    "source": 
        ...
        "snapshot":"incremental" 
    ,
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null

如果需要详细内容,可以看下debezium官网上对于postgres同步的详细介绍:

Debezium connector for PostgreSQL

后续我们就可以消费对应的topic来进行相关的数据同步处理即可

以上是关于使用debeziumkafka-connect将postgres数据实时同步到kafka中的主要内容,如果未能解决你的问题,请参考以下文章

复制槽已经存在

如何使用 stat_cor() 函数将大写字母 P 的 p 值传递给 ggplot 对象?

无法使用 openSSL 将 .p12 转换为 .pem

使用或不使用 OpenSSL 将 SSL .pem 转换为 .p12

使用 ubuntu 将 .password 文件添加到 .p12 文件

如何使用终端将 .p12 文件转换为 .pem 文件?