尝试为 Elasticsearch Sink 配置 Debezium 映像

Posted

技术标签:

【中文标题】尝试为 Elasticsearch Sink 配置 Debezium 映像【英文标题】:Trying to configure Debezium image for Elasticsearch Sink 【发布时间】:2021-04-05 21:21:51 【问题描述】:

我正在尝试使用debezium/connect:1.3 docker 映像,并在尝试使用此tutorial 创建弹性接收器时遇到以下错误。

由于this撰写文件建议使用debezium/connect-jdbc-es,似乎不可用。


    "error_code": 500,
    "message": "Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDescklass=class io.debezium.connector.db2.Db2Connector, name='io.debezium.connector.db2.Db2Connector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-db2/', PluginDescklass=class io.debezium.connector.mongodb.MongoDbConnector, name='io.debezium.connector.mongodb.MongoDbConnector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mongodb/', PluginDescklass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mysql/', PluginDescklass=class io.debezium.connector.oracle.OracleConnector, name='io.debezium.connector.oracle.OracleConnector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-oracle/', PluginDescklass=class io.debezium.connector.postgresql.PostgresConnector, name='io.debezium.connector.postgresql.PostgresConnector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-postgres/', PluginDescklass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.3.1.Final', encodedVersion=1.3.1.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-sqlserver/', PluginDescklass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.6.0', encodedVersion=2.6.0, type=sink, typeName='sink', location='classpath', PluginDescklass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.6.0', encodedVersion=2.6.0, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.6.0', encodedVersion=2.6.0, type=connector, typeName='connector', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.6.0', encodedVersion=2.6.0, type=sink, typeName='sink', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.6.0', encodedVersion=2.6.0, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.6.0', encodedVersion=2.6.0, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.6.0', encodedVersion=2.6.0, type=source, typeName='source', location='classpath', PluginDescklass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.6.0', encodedVersion=2.6.0, type=source, typeName='source', location='classpath'"

【问题讨论】:

【参考方案1】:

您需要install the Elasticsearch sink connector separately 或使用来自 Confluent 的 Kafka Connect 镜像并将 Debezium 安装到这些镜像中

https://www.confluent.io/hub/debezium/debezium-connector-postgresql

【讨论】:

以上是关于尝试为 Elasticsearch Sink 配置 Debezium 映像的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 1.13 将数据 写入 到 elasticsearch 7 案例

具有自定义消费者组名称的 Kafka Sink 连接器

《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

Kafka JDBC Sink 句柄数组数据类型

Kafka HDFS Sink Connector Protobuf 未写入

Flume -- 初识flumesource和sink