Kafka Connect Debezium postgres
Posted
技术标签:
【中文标题】Kafka Connect Debezium postgres【英文标题】: 【发布时间】:2018-11-23 15:53:48 【问题描述】:我正在尝试使用 Debezium 将 Amazon RDS 中托管的 Postgres SQL 数据库与 Kafka 主题连接起来。
我正在学习以下教程:
http://debezium.io/docs/tutorial/
我的 kafka 和 kafka 连接服务启动正常,并且 kafka 连接服务还在 /usr/share/java 目录中获取我的 debezium postgres 连接器 jar。
但是,在尝试使用以下 curl 命令通过 kafka 连接 API 附加 postgres 配置 json 时:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @connector1.json
我最终得到以下错误:
[2018-06-13 23:45:44,749] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
org.apache.kafka.connect.errors.ConnectException: Could not intialize type registry
at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:68)
at io.debezium.connector.postgresql.PostgresConnector.validate(PostgresConnector.java:95)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:534)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:531)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:267)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:216)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
at org.postgresql.Driver.makeConnection(Driver.java:431)
at org.postgresql.Driver.connect(Driver.java:247)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:161)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:585)
at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:65)
... 11 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.postgresql.core.PGStream.<init>(PGStream.java:62)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
... 18 more
能否请一些关于异常以及如何解决它的建议?
这里是否有我可能遗漏的属性/配置?
【问题讨论】:
【参考方案1】:您是否已将 postgres 数据库端口配置为可从外部访问? IIRC RDS 数据库使用 SSL 进行连接,因此可能需要在 postgres 连接器中配置 SSL。
【讨论】:
嘿@Jiri Pechanec 感谢您的指点!这确实是问题所在。当我使用内部 IP 而不是面向外部的 IP 时,问题得到了解决。更改主机后,连接器被 Kafka Connect 服务拾取。以上是关于Kafka Connect Debezium postgres的主要内容,如果未能解决你的问题,请参考以下文章
无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器
Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题
如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?
Debezium 消息与 kafka-connect sink 连接器期望的格式兼容