如何配置从 kafka 到 cassandra 的 kafkaConnect

Posted

技术标签:

【中文标题】如何配置从 kafka 到 cassandra 的 kafkaConnect【英文标题】:Howto configure kafkaConnect from kafka to cassandra 【发布时间】:2016-11-24 04:42:51 【问题描述】:

我想设置从 kafka 主题到 cassandra 的 kafka 连接

问题很简单:说我在 kafka 中有一个演示主题,其中包含 json 数据,例如

"id":"1", "name":"Alex", "clicks":2

我想自动将其推送到 cassanra 表中,其中包含 id、name、clicks 列。

我正在调查kafka-connect-cassandra,但我能找到的唯一示例是从 cassandra 读取并通过中间的 kafka 写入另一个 cassandra 表。

我的问题是如何让它从 kafka 而不是 cassandra 中读取? 我正在寻找一些连接器开源并提供示例。

【问题讨论】:

你是如何将 kafka 连接到 cassandra 数据库的?我正在关注这个,但没有取得任何成功。 itechseeker.com/en/tutorials-2/apache-cassandra/… 【参考方案1】:

您所指的示例是同时展示连接器的源和接收器功能。如果您的用例是将数据从 Kafka 主题推送到 Cassandra 表,那么您只需要一个接收器。请按照以下步骤操作

    创建您自己的接收器属性文件。使用this as an 示例。另存为my-sink.properties 进入安装主目录,执行命令CLASSPATH=<<path-to-connector-jar>> ./bin/connect-standalone connect-standalone.properties my-sink.properties

如果您对更详细的步骤示例感兴趣,请参阅此处:https://github.com/yaravind/kafka-connect-jenkins#standalone-mode(完全披露:我为 Jenkins 维护该连接器。)

【讨论】:

【参考方案2】:

我遇到了同样的问题,我关注了https://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ 上的内容。我正在使用 DataMountaineer 驱动程序 (http://docs.datamountaineer.com/en/latest/cassandra-sink.html),并将其设置为分布式模式。

设置完成后,您的 cassandra 连接器配置 Json(通过 REST API 上传以进行 confluent-connect)应如下所示:


"name": "cassandra.sink.yourConfigName",
"config": 
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "<your topic>",
    "connect.cassandra.sink.kcql": "INSERT INTO <your_table> SELECT *  FROM <your_kafka_topic>;",
    "connect.cassandra.contact.points": "<cassandra nodes>",
    "connect.cassandra.port": "<cassandra port>",
    "connect.cassandra.key.space": "<cassandra keyspace>",
    "connect.cassandra.username": "cassandra",
    "connect.cassandra.password": "cassandra"
    

【讨论】:

以上是关于如何配置从 kafka 到 cassandra 的 kafkaConnect的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

如何将数据从 Cassandra 导出到 mongodb?

spark如何在cassandra表之间复制数据?

Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”

使用 kafka 和 cassandra 进行事件溯源的类别预测

是否有开源 Kafka Cassandra 连接器配置的示例示例?