如何配置从 kafka 到 cassandra 的 kafkaConnect
Posted
技术标签:
【中文标题】如何配置从 kafka 到 cassandra 的 kafkaConnect【英文标题】:Howto configure kafkaConnect from kafka to cassandra 【发布时间】:2016-11-24 04:42:51 【问题描述】:我想设置从 kafka 主题到 cassandra 的 kafka 连接
问题很简单:说我在 kafka 中有一个演示主题,其中包含 json 数据,例如
"id":"1", "name":"Alex", "clicks":2
我想自动将其推送到 cassanra 表中,其中包含 id、name、clicks 列。
我正在调查kafka-connect-cassandra,但我能找到的唯一示例是从 cassandra 读取并通过中间的 kafka 写入另一个 cassandra 表。
我的问题是如何让它从 kafka 而不是 cassandra 中读取? 我正在寻找一些连接器开源并提供示例。
【问题讨论】:
你是如何将 kafka 连接到 cassandra 数据库的?我正在关注这个,但没有取得任何成功。 itechseeker.com/en/tutorials-2/apache-cassandra/… 【参考方案1】:您所指的示例是同时展示连接器的源和接收器功能。如果您的用例是将数据从 Kafka 主题推送到 Cassandra 表,那么您只需要一个接收器。请按照以下步骤操作
-
创建您自己的接收器属性文件。使用this as an 示例。另存为
my-sink.properties
进入安装主目录,执行命令CLASSPATH=<<path-to-connector-jar>> ./bin/connect-standalone connect-standalone.properties my-sink.properties
如果您对更详细的步骤示例感兴趣,请参阅此处:https://github.com/yaravind/kafka-connect-jenkins#standalone-mode(完全披露:我为 Jenkins 维护该连接器。)
【讨论】:
【参考方案2】:我遇到了同样的问题,我关注了https://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ 上的内容。我正在使用 DataMountaineer 驱动程序 (http://docs.datamountaineer.com/en/latest/cassandra-sink.html),并将其设置为分布式模式。
设置完成后,您的 cassandra 连接器配置 Json(通过 REST API 上传以进行 confluent-connect)应如下所示:
"name": "cassandra.sink.yourConfigName",
"config":
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "<your topic>",
"connect.cassandra.sink.kcql": "INSERT INTO <your_table> SELECT * FROM <your_kafka_topic>;",
"connect.cassandra.contact.points": "<cassandra nodes>",
"connect.cassandra.port": "<cassandra port>",
"connect.cassandra.key.space": "<cassandra keyspace>",
"connect.cassandra.username": "cassandra",
"connect.cassandra.password": "cassandra"
【讨论】:
以上是关于如何配置从 kafka 到 cassandra 的 kafkaConnect的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?
Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”