雪花卡夫卡连接器配置问题

Posted

技术标签:

【中文标题】雪花卡夫卡连接器配置问题【英文标题】:Snowflake Kafka connector config issue 【发布时间】:2020-02-12 09:51:58 【问题描述】:

我正在按照本指南中的步骤Snowflake Connector for Kafka

我得到的错误信息是

BadRequestException:连接器配置 ..... 不包含连接器类型

我以

身份运行命令
sh kafka_2.12-2.3.0/bin/connect-standalone.sh connect-standalone.properties snowflake_kafka_config.json

我的配置文件是

connect-standalone.properties

bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/Users/kafka_test/kafka

jar 文件snowflake-kafka-connector-0.5.1.jarplugin.path

雪花_kafka_config.json


  "name":"Kafka_Test",
  "Config":
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"test",
    "snowflake.topic2table.map": "",
    "buffer.count.records":"1",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"65536",
    "snowflake.url.name":"<url>",
    "snowflake.user.name":"<user_name>",
    "snowflake.private.key":"<private_key>",
    "snowflake.private.key.passphrase":"<pass_phrase>",
    "snowflake.database.name":"<db>",
    "snowflake.schema.name":"<schema>",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
    "value.converter.schema.registry.url":"",
    "value.converter.basic.auth.credentials.source":"",
    "value.converter.basic.auth.user.info":""
  


Kafka 在本地运行,我有一个生产者和消费者,可以看到数据流动。

【问题讨论】:

【参考方案1】:

这与我在Confluent community Slack 上回答的问题相同,但我也会在这里发布以供参考:-)


connect worker 日志显示连接器 JAR 本身正在加载,因此“不包含连接器类型”是因为您的配置格式是 fubar。

您在独立模式下运行,但传入的 JSON 文件不会。我个人的意见是始终使用分布式,即使只是它的单个节点。如果您需要回顾一下独立与分布式,请查看此内容:http://rmoff.dev/ksldn19-kafka-connect

如果你必须使用独立,那么你需要你的连接器配置(snowflake_kafka_config.json)是这样的属性文件:

param1=argument1
param2=argument2

您可以在此处查看有效的 JSON 示例(如果您使用分布式模式):https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-zero-to-hero/demo_zero-to-hero-with-kafka-connect.adoc#stream-data-from-kafka-to-elasticsearch

【讨论】:

谢谢。那行得通。我将独立用于测试目的,将使用分布式生产 我建议不要这样做 - 坚持分布式,包括测试。更容易标准化配置并了解如何在所有环境中以相同方式运行它。只是我在这件事上的 0.02 英镑便士。

以上是关于雪花卡夫卡连接器配置问题的主要内容,如果未能解决你的问题,请参考以下文章

雪花卡夫卡连接器限制?

初始化 TEST 连接器后雪花卡夫卡连接器失败

卡夫卡连接 |由于操作冲突,无法完成请求

Debezium MS SQL Server 连接器问题

卡夫卡生产者 RecordTooLargeException

如何将本地蜂巢连接到雪花?