如何使用Debezium从MS SQL中将250张表导入Kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Debezium从MS SQL中将250张表导入Kafka相关的知识,希望对你有一定的参考价值。

您好,我尝试在PostgreSQL作为源与SQL Server作为目标之间建立Kafka连接管道。我使用了3个Kafka代理,需要使用252个主题(一个主题与一个PostgreSQL表相同)。运行一个多小时后,它只能从252张桌子中拉出218张。我发现的错误是SQL Server中存在死锁机制,该机制可以将事务保留到SQL Server并尝试重试,Debezium复制插槽也在那里。

我在接收器上使用了3个max worker的分布式连接器,但似乎还不够。还可以尝试将offset.time_out.ms设置为60000,将偏移分区设置为较高(100)。恐怕这不是我想要的生产水平。任何人都可以对此案提出建议吗?是否有任何计算方法可以确定我所需的最佳工人数量?

UPDATE

这里有些错误。我看到一些连接器被杀死。 一个呼叫我的max_wal_sender超出限制像这样:

[2020-03-26 11:08:27,014] ERROR WorkerSourceTask{id=table_A-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.debezium.jdbc.JdbcConnectionException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 2)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:199)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initConnection(PostgresReplicationConnection.java:260)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:234)
    at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:100)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99)
    ... 5 more
Caused by: org.postgresql.util.PSQLException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 2)
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
    at org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2617)
    at org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:135)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:250)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
    at org.postgresql.Driver.makeConnection(Driver.java:458)
    at org.postgresql.Driver.connect(Driver.java:260)
    at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:191)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:789)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.pgConnection(PostgresReplicationConnection.java:306)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:168)
    ... 9 more

以及其中一个无法注册mbean连接器

[2020-03-26 11:08:27,024] ERROR Unable to unregister the MBean 'debezium.postgres:type=connector-metrics,context=streaming,server=postgres' (io.debezium.pipeline.ChangeEventSourceCoordinator:65)
[2020-03-26 11:08:27,025] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
    at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
    at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
    at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
    at io.debezium.pipeline.ErrorHandler.lambda$setProducerThrowable$0(ErrorHandler.java:42)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

似乎Debezium无法停止连接器:

[2020-03-26 11:30:53,295] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
    at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
    at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
    at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
    at io.debezium.pipeline.ErrorHandler.lambda$setProducerThrowable$0(ErrorHandler.java:42)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

并说已添加debezium插槽

[2020-03-26 11:32:14,892] ERROR WorkerSourceTask{id=table_B-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to start replication stream at LSN{38/BD6247A8}
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:250)
    at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:100)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99)
    ... 5 more
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is already active for PID 27006
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1165)
    at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:881)
    at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58)
    at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42)
    at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
    at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:502)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:348)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:247)
    ... 7 more

和此错误:

[2020-03-26 11:32:14,980] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
    at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
    at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
    at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
    at io.debezium.connector.postgresql.PostgresConnectorTask.stop(PostgresConnectorTask.java:212)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:196)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:154)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-03-26 11:32:15,042] INFO [Producer clientId=connector-producer-table_C-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
[2020-03-26 11:32:15,042] ERROR [Producer clientId=connector-producer-table_C-0] Interrupted while joining ioThread (org.apache.kafka.clients.producer.KafkaProducer:1202)
java.lang.InterruptedException
    at java.base/java.lang.Object.wait(Native Method)
    at java.base/java.lang.Thread.join(Thread.java:1313)
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-03-26 11:32:15,043] INFO [Producer clientId=connector-producer-table_C-0] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1209)
[2020-03-26 11:32:15,043] WARN Could not close producer (org.apache.kafka.connect.runtime.WorkerSourceTask:160)
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1201)
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.InterruptedException
    at java.base/java.lang.Object.wait(Native Method)
    at java.base/java.lang.Thread.join(Thread.java:1313)
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
答案

问题可能是同时访问具有多个任务的同一SQL表并引起了同步问题,如您提到的死锁。由于您已经有很多主题,并且连接器可以并行访问它们,因此建议您将每个主题的分区数减少到仅1(Kafka不支持将分区数减少。应该删除并使用新的分区数重新创建每个主题)。这样,每个主题只有一个分区。每个分区只能在一个线程(/ task / consumer)中访问,因此没有机会将并行SQL事务传输到同一表。

或者,更好的方法是创建具有3个分区的单个主题(与您拥有的任务/消费者的数量相同),并使生产者将SQL表名用作消息键。Kafka保证具有相同密钥的消息始终进入同一分区,因此具有相同表的所有消息都将驻留在单个分区上(消耗单个线程)。

如果您觉得它有用,我可以附上有关如何创建Kafka Producer和发送键控消息的更多信息。

以上是关于如何使用Debezium从MS SQL中将250张表导入Kafka的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Docker 中将 Debezium 连接到 MongoDB?

Debezium MS SQL Server 连接器问题

如何从 Debezium 创建的 avro 消息中获取字段?

如何在 MS Excel 中将特定数据从一个 Excel 工作表传输到另一个工作表?

如何在 MS SQL 中将 last_value 与 group by 结合使用?

如何在单个 Postgres 服务器中将 Embedded Debezium 用于多个数据库?