Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra

Posted

技术标签:

【中文标题】Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra【英文标题】:Kafka-Connect Cassandra Sink Connector Not Pushing Data To Cassandra 【发布时间】:2021-10-12 21:44:51 【问题描述】:

我创建了 Kafka Standalone.properties 文件来建立连接。该文件位于 home/kafka/config/connect-standalone.properties 中,如下所示:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/home/kafka-connect-cassandra-sink-1.4.0/kafka-connect-cassandra-sink-1.4.0.jar

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


#listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092

在第二步,我添加了 kafka-connect-cassandra-sink-1.4.0 文件。文件放在路径 home/kafka-connect-cassandra-sink-1.4.0 ,文件如下:

name=users-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=10

loadBalancing.localDc=datacenter1
contactPoints=localhost
port=9042

username=...
password=...

topics=demo
topic.demo.demo.users.mapping=lastname=value.lastname, firstname=value.firstname, email=value.email

在我的 PC 上 Zookeeper 已经在运行,我还使用以下命令启动了 Kafka bin/kafka-server-start.sh config/connect-standalone.properties

现在为了将 kafka 连接到接收器连接器,我使用了以下命令 sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/cassandra-sink-standalone.properties &> standalone-mode.log &

我的standalone-mode.log 文件是空的(我假设这意味着没有错误,因为在之前的尝试中,我已经解决了这个日志文件中的错误)。

现在为了通过 cassandra 中的 kafka sink 连接器上传文本文件,我使用了以下命令 cat Desktop/users.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; sleep 10;

终端上再次没有错误。 但现在的问题是,当我编写查询以查看 cassandra 数据库中的数据时,表为空:

user.txt 文件如下:

Pruitt:"lastname":"Pruitt", "firstname":"Allie", "email":"allie@example.com"
Krause:"lastname":"Krause", "firstname":"Duncan", "email":"duncan@example.com"
Chase:"lastname":"Chase", "firstname":"Juana", "email":"juana@example.com"
Estrada:"lastname":"Estrada", "firstname":"Edward", "email":"edward@example.com"
Singleton:"lastname":"Singleton", "firstname":"Marie", "email":"Marie@example.com"
Poole:"lastname":"Poole", "firstname":"Olivia", "email":"olivia@example.com"
Marks:"lastname":"Marks", "firstname":"Timothy", "email":"timothy@example.com"
Suarez:"lastname":"Suarez", "firstname":"Claud", "email":"claud@example.com"
Sloan:"lastname":"Sloan", "firstname":"Eloy", "email":"eloy@example.com"
Rodriguez:"lastname":"Rodriguez", "firstname":"Gale", "email":"gale@example.com"
Bautista:"lastname":"Bautista", "firstname":"Constance", "email":"Constance@example.com"
Mcintyre:"lastname":"Mcintyre", "firstname":"Donte", "email":"donte@example.com"
Lang:"lastname":"Lang", "firstname":"Willa", "email":"willa@example.com"
Richmond:"lastname":"Richmond", "firstname":"Dionne", "email":"dionne@example.com"

【问题讨论】:

你应该显示你的实际 users.txt 文件,也不要在调试时使用&> standalone-mode.log &,这样你可以立即看到日志 @OneCricketeer 我已经上传了上面的 users.txt 并在删除 &> Standalone-mode.log 后收到了以下错误 key.converter=org.apache.kafka.connect.storage.StringConverter:找不到命令 kafka/config/connect-standalone.properties:第 11 行:offset.flush.interval.ms=10000:找不到命令 kafka/config/connect-standalone.properties:第 13 行:plugin.path=/kafka-connect -cassandra-sink-1.4.0/kafka-connect-cassandra-sink-1.4.0.jar:没有这样的文件或目录............connect-standalone.properties中的所有语句都没有找到。我已重新检查所有路径是否正确。 路径不是问题。请参阅下面的答案 【参考方案1】:

使用以下命令启动 Kafka bin/kafka-server-start.sh config/connect-standalone.properties

您不要使用 Connect 属性启动代理,您需要使用 server.properties

另外,您是否通过创建官方快速入门中提到的主题和其他任务来验证 Kafka 是否实际运行?

standalone-mode.log 文件为空(我假设这意味着没有错误

不一定。您没有使用该命令捕获 stderr 。例如2>&1

现在将 kafka 连接到 sink 连接器我使用了以下命令 sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/...

收到以下错误:command not found kafka/config/connect-standalone.properties...

属性文件不可执行。您需要运行connect-standalone.sh,它将这两个连接属性文件作为参数

【讨论】:

感谢您的洞察力。我不知道我必须同时开始server.propertiesstandalone. propertiesboth。是的,kafka 生产者和消费者都运行良好。【参考方案2】:

cassandra-sink-standalone.properties 文件中,您需要指定身份验证提供程序,否则它将默认为None,这意味着连接器不会向集群进行身份验证。

根据您发布的内容,您似乎正在使用普通身份验证提供程序,因此请设置以下内容:

auth.provider=PLAIN
auth.username=username
auth.password=S0mePa$$word

干杯!

【讨论】:

我已经在cassandra-sink-standalone.properties 中添加了上面的内容并重启了kafka服务器并再次执行了sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/cassandra-sink-standalone.properties &> standalone-mode.log cat Desktop/users.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; sleep 10;命令,没有错误但表仍然是空的。 Cassandra 真的在 localhost 上监听客户吗?你将rpc_address 设置为什么? 我不知道rpc_address,请告诉在哪个配置文件中检查它? 它在您的集群的conf/cassandra.yaml 中。干杯! 是的,我找到了这个文件,并且 rpc 设置为 rpc_address: localhost

以上是关于Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-Connect实践

Kafka-connect 是不是必须使用模式注册表?

Cassandra内部架构

使用本地 kafka-connect 集群连接远程数据库的连接超时

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题

MySql 查询在 Kafka-connect 中失败