如何将文件写入 Kafka Producer

Posted

技术标签:

【中文标题】如何将文件写入 Kafka Producer【英文标题】:How to write a file to Kafka Producer 【发布时间】:2016-01-21 07:01:42 【问题描述】:

我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入。 下载Kafka后,我执行了以下步骤:

启动 zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为“test”的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

消费者聆听:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

我想将一个数据文件甚至一个简单的文本文件传递给消费者可以直接看到的生产者,而不是标准输入。任何帮助将不胜感激。谢谢!

【问题讨论】:

【参考方案1】:

你可以用管道输入:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

找到here。

从 0.9.0 开始:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

【讨论】:

我使用的是 Kafka-0.9。 --new-producer 在 kafka-console-producer.sh 中不受支持,相反,$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic 我认为 --new-producer 是 0.9 的实际生产者 :) @BalázsMáriaNémeth,如何在固定时间间隔后仅传递 n 行(例如传递前 10 行,等待 5 秒,传递 10-20 行等)。在所有行结束后,它应该从第 1 行重新开始? @vdep 你应该可以使用 kafkacat 做到这一点:github.com/edenhill/kafkacat 这不适用于 0.11 .. 对新版本有什么建议吗?【参考方案2】:
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

在 Kafka-0.9.0 中为我工作

【讨论】:

【参考方案3】:

这里有一些比较通用的方法,但对于简单的文件来说可能是多余的

尾巴

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

解释

    tail 随着文件的增长或不断向其添加日志,从文件末尾读取 -n0 表示输出最后 0 行,所以只选择新行 -F 按名称而不是描述符跟随文件,因此即使旋转它也可以工作

系统日志-ng

options                                                                                                                              
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
;

source s_file 
    file("path to my-file.txt" flags(no-parse));



destination loghost 
    tcp("*.*.*.*" port(5140));
 

消耗

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

解释(来自man nc

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

参考

Syslog-ng

【讨论】:

【参考方案4】:
echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

【讨论】:

虽然此代码 sn-p 可以解决问题,包括解释 really helps 以提高您的帖子质量。请记住,您正在为将来的读者回答问题,而不仅仅是现在提问的人!请edit您的答案添加解释,并说明适用的限制和假设。 有时代码就是文档。很明显,它只是在一些文本中进行管道传输。

以上是关于如何将文件写入 Kafka Producer的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink kafka producer 分区策略 (flink写入数据不均匀 与 数据写入 分区无数据 )

Kafka基础:高效读写数据

Kafka 主题分区

将 Spark 数据集转换为 JSON 并写入 Kafka Producer

关于Kafka幂等producer的讨论

kafka kafka如何设置指定分区进行发送和消费