无法使用 JDBCSinkConnector 将数据从 Kafka 主题加载到 Postgres

Posted

技术标签:

【中文标题】无法使用 JDBCSinkConnector 将数据从 Kafka 主题加载到 Postgres【英文标题】:Unable to load data from Kafka topic to Postgres using JDBCSinkConnector 【发布时间】:2021-06-26 09:22:38 【问题描述】:

我已经 dockerized Kafka 和 Postgres。我使用 JDBC Sink 连接器将数据从 Kafka 主题加载到 Postgres 表。首先,我使用“AVRO”值格式在其上方创建一个主题和一个流。

CREATE STREAM TEST01 (ROWKEY VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');

这是创建 Sink Connector 的代码:

curl -X PUT http://localhost:8083/connectors/sink-jdbc-postgre-01/config \
     -H "Content-Type: application/json" -d '
    "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url"                     : "jdbc:postgresql://postgres:5432/",
    "topics"                             : "test01",
    "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "connection.user"                    : "postgres",
    "connection.password"                : "********",
    "auto.create"                        : true,
    "auto.evolve"                        : true,
    "insert.mode"                        : "insert",
    "pk.mode"                            : "record_key",
    "pk.fields"                          : "MESSAGE_KEY"
'

然后,我使用\dt 命令检查 Postgres 是否有任何来自 Kafka 的数据,它返回以下内容:Did not find any relations.

然后我检查 kafka-connect 日志,它返回以下结果:

[2021-03-30 10:05:07,546] INFO Attempting to open connection #2 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect            | [2021-03-30 10:05:07,577] INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect            | org.postgresql.util.PSQLException: The connection attempt failed.
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:296)
connect            |    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
connect            |    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
connect            |    at org.postgresql.Driver.makeConnection(Driver.java:459)
connect            |    at org.postgresql.Driver.connect(Driver.java:261)
connect            |    at java.sql.DriverManager.getConnection(DriverManager.java:664)
connect            |    at java.sql.DriverManager.getConnection(DriverManager.java:208)
connect            |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:224)
connect            |    at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:93)
connect            |    at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
connect            |    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)
connect            |    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: java.net.UnknownHostException: postgres
connect            |    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
connect            |    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
connect            |    at java.net.Socket.connect(Socket.java:589)
connect            |    at org.postgresql.core.PGStream.<init>(PGStream.java:81)
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:92)
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:196)
connect            |    ... 22 more
connect            | [2021-03-30 10:05:17,578] INFO Attempting to open connection #3 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect            | [2021-03-30 10:05:17,732] ERROR WorkerSinkTaskid=sink-jdbc-postgre-01-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: org.postgresql.util.PSQLException: The connection attempt failed. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: The connection attempt failed.
connect            |    at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:69)
connect            |    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)
connect            |    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:296)
connect            |    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
connect            |    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
connect            |    at org.postgresql.Driver.makeConnection(Driver.java:459)
connect            |    at org.postgresql.Driver.connect(Driver.java:261)
connect            |    at java.sql.DriverManager.getConnection(DriverManager.java:664)
connect            |    at java.sql.DriverManager.getConnection(DriverManager.java:208)
connect            |    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:224)
connect            |    at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:93)
connect            |    at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
connect            |    ... 13 more
connect            | Caused by: java.net.UnknownHostException: postgres
connect            |    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
connect            |    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
connect            |    at java.net.Socket.connect(Socket.java:589)
connect            |    at org.postgresql.core.PGStream.<init>(PGStream.java:81)
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:92)
connect            |    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:196)
connect            |    ... 22 more
connect            | [2021-03-30 10:05:17,734] ERROR WorkerSinkTaskid=sink-jdbc-postgre-01-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

我认为问题可能出在 /usr/share/java/kafka-connect-jdbc 中缺少 postgresql 连接器 .jar 文件,但在这里。

root@connect:/usr/share/java/kafka-connect-jdbc# ls -l
total 8412
-rw-r--r-- 1 root root   17555 Apr 18  2020 common-utils-5.5.0.jar
-rw-r--r-- 1 root root  317816 Apr 18  2020 jtds-1.3.1.jar
-rw-r--r-- 1 root root  230113 Apr 18  2020 kafka-connect-jdbc-5.5.0.jar
-rw-r--r-- 1 root root  927447 Apr 18  2020 postgresql-42.2.10.jar
-rw-r--r-- 1 root root   41139 Apr 18  2020 slf4j-api-1.7.26.jar
-rw-r--r-- 1 root root 7064881 Apr 18  2020 sqlite-jdbc-3.25.2.jar

该问题的解决方案是什么?

【问题讨论】:

你对 Connect 进行了 dockerize 化吗?它是否与 Postgres 在同一个 Docker 网络上运行?您正在使用 Compose 并且可以显示您的文件吗? @OneCricketeer 感谢您的提示!它帮助我记住容器应该在同一个网络上!真丢脸) 【参考方案1】:

这是您的堆栈跟踪中的错误:

java.net.UnknownHostException: postgres

这意味着您的 Kafka Connect 工作机器无法找到主机 postgres

【讨论】:

【参考方案2】:

感谢@Robin Moffatt 教程和@OneCricketeer 提示,我找到了解决此问题的方法。 Kafka-connect 和 Postgres 应该在一个 docker-compose.yml 文件中。我在下面附上了 docker-compose.yml 的代码。希望对遇到同样问题的人有所帮助:

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:5.5.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:5.5.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.5.0
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_PASSWORD: postgres
    ports:
      - 5432:5432

【讨论】:

以上是关于无法使用 JDBCSinkConnector 将数据从 Kafka 主题加载到 Postgres的主要内容,如果未能解决你的问题,请参考以下文章

似乎无法通过 Vue 中的道具将数据传递给组件

无法将数据传递给前一个视图控制器

将数据传递给静态表 - 无法访问 TableViewCell 单元格值

Firebase:将数据传递给 cloudfunction 导致:对象无法用 JSON 编码

WebContent的子目录里面的jsp文件无法将数据传递给Servlet

如何将数据传递给 kthread_run