CDC 与 WSO2 Streaming Integrator 和 Postgres DB

Posted

技术标签:

【中文标题】CDC 与 WSO2 Streaming Integrator 和 Postgres DB【英文标题】:CDC with WSO2 Streaming Integrator and Postgres DB 【发布时间】:2020-08-07 00:36:29 【问题描述】:

我正在尝试在 WSO2 Streaming Integrator 和本地 Postgres DB 之间设置变更数据捕获 (CDC)。

我已将 Postgres 驱动程序 (v42.2.5) 添加到 SI_HOME/lib,并且我能够从 Siddhi 应用程序的数据库中读取数据。

我正在按照CDCWithListeningMode 示例来实现 CDC,并且我正在使用 pgoutput 作为逻辑解码插件。但是当我运行应用程序时,我会得到以下日志。

    [2020-04-23_19-02-37_460] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = key
    schemas.cache.size = 1000
    schemas.enable = true

[2020-04-23_19-02-37_461] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = value
    schemas.cache.size = 1000
    schemas.enable = false

[2020-04-23_19-02-37_461] INFO io.debezium.embedded.EmbeddedEngine$EmbeddedConfig - EmbeddedConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    bootstrap.servers = [localhost:9092]
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = null
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = 
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.path = null
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.host.name = null
    rest.port = 8083
    ssl.client.auth = none
    task.shutdown.graceful.timeout.ms = 5000
    value.converter = class org.apache.kafka.connect.json.JsonConverter

[2020-04-23_19-02-37_516] INFO io.debezium.connector.common.BaseSourceTask -    offset.storage = io.siddhi.extension.io.cdc.source.listening.InMemoryOffsetBackingStore 
[2020-04-23_19-02-37_517] INFO io.debezium.connector.common.BaseSourceTask -    database.server.name = localhost_5432 
[2020-04-23_19-02-37_517] INFO io.debezium.connector.common.BaseSourceTask -    database.port = 5432 
[2020-04-23_19-02-37_517] INFO io.debezium.connector.common.BaseSourceTask -    table.whitelist = SweetProductionTable 
[2020-04-23_19-02-37_517] INFO io.debezium.connector.common.BaseSourceTask -    cdc.source.object = 1716717434 
[2020-04-23_19-02-37_517] INFO io.debezium.connector.common.BaseSourceTask -    database.hostname = localhost 
[2020-04-23_19-02-37_518] INFO io.debezium.connector.common.BaseSourceTask -    database.password = ******** 
[2020-04-23_19-02-37_518] INFO io.debezium.connector.common.BaseSourceTask -    name = CDCWithListeningModeinsertSweetProductionStream 
[2020-04-23_19-02-37_518] INFO io.debezium.connector.common.BaseSourceTask -    server.id = 6140 
[2020-04-23_19-02-37_519] INFO io.debezium.connector.common.BaseSourceTask -    database.history = io.debezium.relational.history.FileDatabaseHistory 
[2020-04-23_19-02-38_103] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'user_name' connected to database 'db_name' on PostgreSQL 11.5, compiled by Visual C++ build 1914, 64-bit with roles:
    role 'user_name' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true] (Encoded) 
[2020-04-23_19-02-38_104] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found 
[2020-04-23_19-02-38_104] INFO io.debezium.connector.postgresql.PostgresConnectorTask - Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished... 
[2020-04-23_19-02-38_105] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost_5432 named = records-snapshot-producer 
[2020-04-23_19-02-38_105] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost_5432 named = records-stream-producer 
[2020-04-23_19-02-38_293] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLSN=null] 
[2020-04-23_19-02-38_704] ERROR io.siddhi.core.stream.input.source.Source - Error on 'CDCWithListeningMode'. Connection to the database lost. Error while connecting at Source 'cdc' at 'insertSweetProductionStream'. Will retry in '5 sec'. (Encoded) 
io.siddhi.core.exception.ConnectionUnavailableException: Connection to the database lost.
    at io.siddhi.extension.io.cdc.source.CDCSource.lambda$connect$1(CDCSource.java:424)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:793)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create replication connection
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:87)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:38)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:362)
    at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:65)
    at io.debezium.connector.postgresql.RecordsStreamProducer.(RecordsStreamProducer.java:81)
    at io.debezium.connector.postgresql.RecordsSnapshotProducer.(RecordsSnapshotProducer.java:70)
    at io.debezium.connector.postgresql.PostgresConnectorTask.createSnapshotProducer(PostgresConnectorTask.java:133)
    at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:677)
    ... 3 more
Caused by: io.debezium.jdbc.JdbcConnectionException: ERROR: could not access file "decoderbufs": No such file or directory
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:145)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:79)
    ... 12 more
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:266)
    at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:108)
    ... 13 more

Debezium 默认使用 decoderbufs 插件 - “无法访问文件“decoderbufs”:没有这样的文件或目录”。

根据this answer的说法,问题是由于decoderbufs插件的配置造成的。

详情

    Postgres - 11.4 siddhi-cdc-io - 2.0.3 Debezium - 0.8.3

如何配置嵌入式 debezium 引擎以使用 pgoutput 插件?更改此配置会修复错误吗?

请帮我解决这个问题。我没有找到任何可以帮助我的资源。

【问题讨论】:

【参考方案1】:

您需要将 Debezium 更新到最新的 1.1 版本 - 这将使您能够使用 plugin.name 配置选项使用 pgoutput 插件,或者您需要部署(并且可能构建)decoderbufs.so 库到您的 PostgreSQL 数据库.

我推荐前者,因为 0.8.3 是非常旧的版本。

【讨论】:

【参考方案2】:

当我尝试使用 pgoutput 逻辑解码输出插件进行 CDC 时,我在 PostgreSQL 12 中观察到了这种行为。似乎即使我使用 pgoutput 配置了数据库,但 siddhi 扩展正在尝试使用“decoderbufs”作为解码插件来建立连接。

当我尝试将 decoderbufs 配置为数据库级别的逻辑解码输出插件时,我能够毫无问题地使用 siddhi io 扩展。

目前看来,Siddhi io CDC 只支持decoderbufs PostgreSQL 逻辑解码输出插件。

【讨论】:

以上是关于CDC 与 WSO2 Streaming Integrator 和 Postgres DB的主要内容,如果未能解决你的问题,请参考以下文章

Stream Processing 101: From SQL to Streaming SQL in 10 Minutes

Oauth 2.0 与 Auth0 与 WSO2

Wso2 身份服务器连接配置与 Microsoft 活动目录

WSO2 Api 管理器与身份服务器

WSO2 ESB 代理与 IS 授权服务错误

如何在同一服务器上配置WSO2 AS和WSO2 AM与身份服务器?