如何在没有 Confluent 的情况下使用 Kafka Connect for Cassandra

Posted

技术标签:

【中文标题】如何在没有 Confluent 的情况下使用 Kafka Connect for Cassandra【英文标题】:How to use Kafka Connect for Cassandra without Confluent 【发布时间】:2017-07-23 08:03:04 【问题描述】:

我们如何在不使用 Confluent 框架的情况下将 Kafka Connect 与 Cassandra 结合使用。

【问题讨论】:

【参考方案1】:

Kafka Connect 框架。 Confluent 仅提供连接器。如果您不想使用 Confluent Open Source(但为什么不呢?),您也可以将所有这些连接器与 vanilla Apache Kafka 一起使用。

有多个 Casandra 连接器可用:https://www.confluent.io/product/connectors/

顺便说一句:列出的 Casandra 连接器都不是由 Confluent 维护的。

当然,您也可以编写自己的连接器或使用任何其他第三方连接器。

【讨论】:

在我的例子中,访问数据库的基本概念是使用 SQL/CQL 查询。 connect 还对数据库执行查询以存储数据或获取数据。如果我建立一个消费者组,一个用于处理,另一个用于将其存储到 DB,那么一个用于存储到 DB,例如 DB-Consumer,它的工作是只将数据存储到我可以使用 ORM 轻松完成的数据库中,我也会完全透明并对其进行控制。所以我担心的是它在性能和速度方面与这种类型的(DB-consumer)消费者实际上有何不同。提前感谢您在这方面的帮助和帮助。 Connect as a framework 负责故障转移,您还可以在分布式模式下运行它以扩展您的数据导入/导出“作业”。因此,Connect 确实是一种“一劳永逸”的体验。此外,对于 Connect,您无需编写任何代码——您只需配置连接器。 Confluent cp-kafka-connect 没有 cassandra 连接器 您可以在 Confluent Hub 上找到 Cassandra 连接器:confluent.io/connector/kafka-connect-cassandra【参考方案2】:

DataMountaineer Stream Reactor 具有可与 Kafka Connect 一起使用的 Cassandra Source 和 Sink 解决方案。

将 jar 文件 (download) 放入 Kafka libs 文件夹,然后按如下方式指定您的连接器:


"name": "cassandra-NAME",
"config": 
    "tasks.max": "1",
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
    "connect.cassandra.key.space": "KEYSPACE",
    "connect.cassandra.source.kcql": "INSERT INTO KAFKA_TOPIC SELECT column1, timestamp_col FROM CASSANDRA_TABLE PK timestamp_col",
    "connect.cassandra.import.mode": "incremental",
    "connect.cassandra.contact.points": "localhost",
    "connect.cassandra.port": 9042,
    "connect.cassandra.import.poll.interval": 10000

启动 Kafka Connect

bin/connect-distributed.sh config/connect-distributed.properties

并通过上面提到的 JSON 属性文件将 Cassandra 连接器加载到 Kafka Connect(假设它的名称为 connect-cassandra-source.json)

curl -X POST -H "Content-Type: application/json" -d @config/connect-cassandra-source.json localhost:8083/connectors

您将需要创建一个具有 timeuuid 列作为集群键的表。这被描述为here。

【讨论】:

以上是关于如何在没有 Confluent 的情况下使用 Kafka Connect for Cassandra的主要内容,如果未能解决你的问题,请参考以下文章

在没有安装 Confluent 平台的情况下使用 Confluent Hub

在没有 Confluent Schema Registry 的情况下在 KafkaConnect 中使用 Avro

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?

如何在kafka-python和confluent-kafka之间做出选择

Kafka HDFS 连接器 - 没有完全融合