Kafka-Connect实践
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka-Connect实践相关的知识,希望对你有一定的参考价值。
一、Kafka-Connect介绍
Kafka
是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,
- 之前经常使用
Producer
来发送消息到Broker,并使用Consumer来消费Broker中的消息。 Kafka Connect
是到0.9
版本才提供的并极大的简化了其他系统与Kafka
的集成。Kafka Connect
运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等)
,这些功能让大批量数据导入/导出Kafka
很方便。
1.1、Kafka Connect
特性包括:
Kafka connector
通用框架,提供统一的集成API
- 同时支持分布式模式和单机模式
REST
接口,用来查看和管理Kafka connectors
- 自动化的
offset
管理,开发人员不必担心错误处理的影响 - 分布式、可扩展
- 流/批处理集成
二、Kafka-Connect实践
2.1、Flie Connector 测试
本例使用到了两个Connector
:
FileStreamSource
:从test.txt
中读取并发布到Broker
中FileStreamSink
:从Broker中读取数据并写入到test.sink.txt
文件中
2.1.1、配置Source
其中的Source
使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
2.1.2、配置Sink
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
2.1.3、配置 Broker
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
2.1.4、启动Source Connector和Sink Connector
cd ${KAFKA_HOME}
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
2.1.5、打开console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
2.1.6、写入到test.txt文件中,并观察console-consumer
中的变化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
console-consumer
中打开的窗口输出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
查看 test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line
关注我的公众号【宝哥大数据】,更多干货
参考
https://www.cnblogs.com/videring/articles/6371081.html
以上是关于Kafka-Connect实践的主要内容,如果未能解决你的问题,请参考以下文章
使用本地 kafka-connect 集群连接远程数据库的连接超时