我如何将 Debezium 连接器与 Apache Flink 一起使用

Posted

技术标签:

【中文标题】我如何将 Debezium 连接器与 Apache Flink 一起使用【英文标题】:How can i use Debezium connector with Apache Flink 【发布时间】:2021-03-04 07:37:30 【问题描述】:

我正在尝试使用 Debezium 源函数创建一个带有 flink 的表 API 的表,我在这里找到了这些函数的实现 https://github.com/ververica/flink-cdc-connectors,并在我的代码中像这样使用它们:

val debeziumProperties = new Properties()
  debeziumProperties.setProperty("plugin.name", "wal2json")
  debeziumProperties.setProperty("format", "debezium-json")

  val sourceFunction: DebeziumSourceFunction[TestCharge] = PostgreSQLSource.builder()
    .hostname("******")
    .port(5432)
    .database("*****") // monitor all tables under inventory database
    .username("*****")
    .password("*****")
    .debeziumProperties(debeziumProperties)
    .deserializer(new CustomDebeziumDeserializer) // converts SourceRecord to String
    .build()

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val sTableEnv = StreamTableEnvironment.create(env, sSettings)

  val cdcStream: DataStream[TestCharge] = env
    .addSource(sourceFunction)
    .map(x => x)

  sTableEnv.createTemporaryView("historic", cdcStream, 'chargeId, 'email, 'amount, 'cardHash)
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(amount) FROM historic GROUP BY chargeId")

  val reverse = sTableEnv.toRetractStream[Row](table)

  reverse.print()

我还按照文档中的说明添加了此依赖项:

"com.alibaba.ververica" % "flink-sql-connector-postgres-cdc" % "1.1.0"

当我尝试在迷你集群上本地运行我的作业时,它工作正常,但在 Kubernetes 上配置的 Flink 集群中,它给了我这个异常:

Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found

有谁知道会发生什么,或者我是否缺少某些依赖项?

提前致谢。

【问题讨论】:

【参考方案1】:

如果你想在 TableAPI/SQL 中使用它,你可以使用 SQL DDL 注册表。

sTableEnv.executeSql(
      """
        |CREATE TABLE shipments (
        |  shipment_id INT,
        |  order_id INT,
        |  origin STRING,
        |  destination STRING,
        |  is_arrived BOOLEAN
        |) WITH (
        |  'connector' = 'postgres-cdc',
        |  'hostname' = 'localhost',
        |  'port' = '5432',
        |  'username' = 'postgres',
        |  'password' = 'postgres',
        |  'database-name' = 'postgres',
        |  'schema-name' = 'public',
        |  'table-name' = 'shipments'
        |)
        |""".stripMargin)
// then you can query the table
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(shipment_id) FROM shipments GROUP BY order_id")

这是使用 CDC 源代码的最简单方法。因为目前 Table API 不支持将 changelog 流转换为 Table

关于你的问题,我认为这可能是因为依赖冲突。请检查您是否依赖于另一个版本的<artifactId>debezium-embedded</artifactId>。如果是,请删除它。 flink-sql-connector-postgres-cdc 已经用 1.12 版本打包了。

【讨论】:

能否请您为上述声明“Table API 不支持将更改日志流转换为 Table”提供参考 @python_therapy 这是一个正在开发的新功能,希望在下一个 1.13 版本中发布。见cwiki.apache.org/confluence/display/FLINK/… @JarkWu - 我需要从 postgres 数据源创建一个数据流,我的结果集是三个表的连接。你能分享一个这样的例子吗?

以上是关于我如何将 Debezium 连接器与 Apache Flink 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题

一个 Mysql DB 的多个 debezium 连接器

Debezium MongoDB 连接器错误:org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出公差

如何将新表添加到 Debezium MySQL 连接器?

Debezium 连接器发件箱转换

由于错误代码 1236,无法启动 debezium MySQL 连接器