如何使用Debezium从MS SQL中将250张表导入Kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Debezium从MS SQL中将250张表导入Kafka相关的知识,希望对你有一定的参考价值。
您好,我尝试在PostgreSQL作为源与SQL Server作为目标之间建立Kafka连接管道。我使用了3个Kafka代理,需要使用252个主题(一个主题与一个PostgreSQL表相同)。运行一个多小时后,它只能从252张桌子中拉出218张。我发现的错误是SQL Server中存在死锁机制,该机制可以将事务保留到SQL Server并尝试重试,Debezium复制插槽也在那里。
我在接收器上使用了3个max worker的分布式连接器,但似乎还不够。还可以尝试将offset.time_out.ms设置为60000,将偏移分区设置为较高(100)。恐怕这不是我想要的生产水平。任何人都可以对此案提出建议吗?是否有任何计算方法可以确定我所需的最佳工人数量?
UPDATE
这里有些错误。我看到一些连接器被杀死。 一个呼叫我的max_wal_sender超出限制像这样:
[2020-03-26 11:08:27,014] ERROR WorkerSourceTask{id=table_A-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:201)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.debezium.jdbc.JdbcConnectionException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 2)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:199)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initConnection(PostgresReplicationConnection.java:260)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:234)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:100)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99)
... 5 more
Caused by: org.postgresql.util.PSQLException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 2)
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.readStartupMessages(QueryExecutorImpl.java:2617)
at org.postgresql.core.v3.QueryExecutorImpl.<init>(QueryExecutorImpl.java:135)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:250)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
at org.postgresql.Driver.makeConnection(Driver.java:458)
at org.postgresql.Driver.connect(Driver.java:260)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:191)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:789)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.pgConnection(PostgresReplicationConnection.java:306)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:168)
... 9 more
以及其中一个无法注册mbean连接器:
[2020-03-26 11:08:27,024] ERROR Unable to unregister the MBean 'debezium.postgres:type=connector-metrics,context=streaming,server=postgres' (io.debezium.pipeline.ChangeEventSourceCoordinator:65)
[2020-03-26 11:08:27,025] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
at io.debezium.pipeline.ErrorHandler.lambda$setProducerThrowable$0(ErrorHandler.java:42)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
似乎Debezium无法停止连接器:
[2020-03-26 11:30:53,295] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
at io.debezium.pipeline.ErrorHandler.lambda$setProducerThrowable$0(ErrorHandler.java:42)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
并说已添加debezium插槽:
[2020-03-26 11:32:14,892] ERROR WorkerSourceTask{id=table_B-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:201)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to start replication stream at LSN{38/BD6247A8}
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:250)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:100)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99)
... 5 more
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is already active for PID 27006
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1165)
at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:881)
at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58)
at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42)
at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:502)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:348)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:247)
... 7 more
和此错误:
[2020-03-26 11:32:14,980] ERROR Interrupted while stopping (io.debezium.connector.postgresql.PostgresConnectorTask:239)
java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2109)
at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
at io.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
at io.debezium.connector.postgresql.PostgresConnectorTask.cleanupResources(PostgresConnectorTask.java:234)
at io.debezium.connector.postgresql.PostgresConnectorTask.stop(PostgresConnectorTask.java:212)
at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:196)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:154)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-03-26 11:32:15,042] INFO [Producer clientId=connector-producer-table_C-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)
[2020-03-26 11:32:15,042] ERROR [Producer clientId=connector-producer-table_C-0] Interrupted while joining ioThread (org.apache.kafka.clients.producer.KafkaProducer:1202)
java.lang.InterruptedException
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Thread.join(Thread.java:1313)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-03-26 11:32:15,043] INFO [Producer clientId=connector-producer-table_C-0] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1209)
[2020-03-26 11:32:15,043] WARN Could not close producer (org.apache.kafka.connect.runtime.WorkerSourceTask:160)
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1201)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.InterruptedException
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Thread.join(Thread.java:1313)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
问题可能是同时访问具有多个任务的同一SQL表并引起了同步问题,如您提到的死锁。由于您已经有很多主题,并且连接器可以并行访问它们,因此建议您将每个主题的分区数减少到仅1(Kafka不支持将分区数减少。应该删除并使用新的分区数重新创建每个主题)。这样,每个主题只有一个分区。每个分区只能在一个线程(/ task / consumer)中访问,因此没有机会将并行SQL事务传输到同一表。
或者,更好的方法是创建具有3个分区的单个主题(与您拥有的任务/消费者的数量相同),并使生产者将SQL表名用作消息键。Kafka保证具有相同密钥的消息始终进入同一分区,因此具有相同表的所有消息都将驻留在单个分区上(消耗单个线程)。
如果您觉得它有用,我可以附上有关如何创建Kafka Producer和发送键控消息的更多信息。
以上是关于如何使用Debezium从MS SQL中将250张表导入Kafka的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Docker 中将 Debezium 连接到 MongoDB?
如何从 Debezium 创建的 avro 消息中获取字段?
如何在 MS Excel 中将特定数据从一个 Excel 工作表传输到另一个工作表?