Kafka Cassandra 连接器实际上并未写入数据库
Posted
技术标签:
【中文标题】Kafka Cassandra 连接器实际上并未写入数据库【英文标题】:Kafka Cassandra connector doesnt actually write to database 【发布时间】:2017-08-03 16:26:52 【问题描述】:我在这里使用开源 Kafka Cassandra 连接器:https://github.com/tuplejump/kafka-connect-cassandra
我按照教程进行了设置说明。但是,连接器不会将任何数据插入我的数据库。这是我的 sink.properties 文件的内容:
name=cassandra-sink-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
topics=hello-mqtt-kafka
cassandra.sink.route.hello-mqtt-kafka=devices_data.messages
我运行 Kafka、Cassandra 和 Zookeeper,他们正在工作。我向“hello-kafka”主题发送了一些消息。出于测试目的,我运行了控制台使用者,它会看到所有消息:
"schema":"type":"struct","fields":["type":"int32","optional":false,"field":"id","type":"string","optional":false,"field":"text"],"optional":false,"name":"devices.schema","payload":"id":75679795,"text":"example5"
"schema":"type":"struct","fields":["type":"int32","optional":false,"field":"id","type":"string","optional":false,"field":"text"],"optional":false,"name":"devices.schema","payload":"id":86874233,"text":"example6"
这是我的 cassandra 表的架构:
CREATE TABLE IF NOT EXISTS devices_data.messages (
id int,
created text,
message text,
PRIMARY KEY (id, created))
WITH ID = 2de24390-03d5-11e7-a32a-ed242ef1cc00
AND CLUSTERING ORDER BY (created ASC)
AND bloom_filter_fp_chance = 0.01
AND dclocal_read_repair_chance = 0.1
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND min_index_interval = 128
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE'
AND comment = ''
AND caching = 'keys': 'ALL', 'rows_per_partition': 'NONE'
AND compaction = 'max_threshold': '32', 'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
AND compression = 'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'
现在,我的连接器正在运行,它没有抛出任何错误,但是当我从 cqlsh 进行选择查询时,我看到我的数据没有插入到 cassandra。我已按照设置说明进行操作,来自工作人员的日志也没有显示任何问题。出于调试目的,我将一些格式错误的数据传递给了 kafka,并且连接器报告了消息格式的错误。因此,它肯定会看到消息,但由于某种原因,它不会将其插入数据库。
我在这个错误上坐了好几个小时,不知道哪里出了问题……我真的很感激任何帮助或知道我可能错过了什么。
这是来自连接器的日志。关于“提交偏移量”的最后几行一直在重复,但数据库中没有出现任何内容。
[2017-03-13 15:46:40,540] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-13 15:46:40,540] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-13 15:46:40,547] INFO Created connector cassandra-sink-connector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-03-13 15:46:40,554] INFO Configured 1 Kafka - Cassandra mappings. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:86)
[2017-03-13 15:46:40,955] INFO Did not find Netty's native epoll transport in the classpath, defaulting to NIO. (com.datastax.driver.core.NettyUtil:83)
[2017-03-13 15:46:42,039] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:95)
[2017-03-13 15:46:42,040] INFO New Cassandra host localhost/127.0.0.1:9042 added (com.datastax.driver.core.Cluster:1475)
[2017-03-13 15:46:42,041] INFO Connected to Cassandra cluster: Test Cluster (com.tuplejump.kafka.connect.cassandra.CassandraCluster:81)
[2017-03-13 15:46:42,114] INFO com.datastax.driver.core.SessionManager@63cd5271 created. (com.tuplejump.kafka.connect.cassandra.CassandraCluster:84)
[2017-03-13 15:46:42,146] INFO CassandraSinkTask starting with 1 routes. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:189)
[2017-03-13 15:46:42,149] INFO Sink task WorkerSinkTaskid=cassandra-sink-connector-0 finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2017-03-13 15:46:42,294] INFO Discovered coordinator ismop-virtual-machine:9092 (id: 2147483647 rack: null) for group connect-cassandra-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2017-03-13 15:46:42,302] INFO Revoking previously assigned partitions [] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2017-03-13 15:46:42,309] INFO (Re-)joining group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2017-03-13 15:46:42,319] INFO Successfully joined group connect-cassandra-sink-connector with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2017-03-13 15:46:42,320] INFO Setting newly assigned partitions [hello-mqtt-kafka-0] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2017-03-13 15:46:43,337] INFO Reflections took 4909 ms to scan 64 urls, producing 3915 keys and 28184 values (org.reflections.Reflections:229)
[2017-03-13 15:46:50,462] INFO WorkerSinkTaskid=cassandra-sink-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:00,460] INFO WorkerSinkTaskid=cassandra-sink-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:10,455] INFO WorkerSinkTaskid=cassandra-sink-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:20,455] INFO WorkerSinkTaskid=cassandra-sink-connector-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
【问题讨论】:
【参考方案1】:Apache Kafka 包含一个消费者组管理工具:bin/kafka-consumer-groups.sh
连接器中应该从 Kafka 读取事件的消费者组称为“connect-cassandra-sink-connector”(您可以在您发布的日志 sn-p 中看到它)。
我建议使用这个工具来检查:
-
消费者是否都赶上了(即它获得的最后一个偏移量是日志的结尾)?如果是这样,请尝试为该主题编写新事件并查看是否已写入。也许只是起步较晚,错过了早期活动?
看起来有进展吗?如果是,它看起来认为它正在成功地写入 Cassandra。如果不是,则无法从 Kafka 读取。尝试检查代理日志上的错误,并可能增加 Connect 的日志级别,看看是否有任何有趣的东西出现。
【讨论】:
以上是关于Kafka Cassandra 连接器实际上并未写入数据库的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”
kafka Cassandra接收器连接器中的ClassCastException
是否有开源 Kafka Cassandra 连接器配置的示例示例?
cassandra,hbase,kafka,elasticsearch redis 对比总结