Kafka-Connect实践

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka-Connect实践相关的知识,希望对你有一定的参考价值。

一、Kafka-Connect介绍

  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,

  • 之前经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。
  • Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。

1.1、Kafka Connect 特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

二、Kafka-Connect实践

2.1、Flie Connector 测试


本例使用到了两个Connector:

  • FileStreamSource:从test.txt中读取并发布到Broker
  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中

2.1.1、配置Source

其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

2.1.2、配置Sink

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

2.1.3、配置 Broker

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

2.1.4、启动Source Connector和Sink Connector

cd ${KAFKA_HOME}
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 

2.1.5、打开console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test

2.1.6、写入到test.txt文件中,并观察console-consumer中的变化

[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt

console-consumer中打开的窗口输出如下

{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}

查看 test.sink.txt

[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt 
firest line
second line

关注我的公众号【宝哥大数据】,更多干货


参考
https://www.cnblogs.com/videring/articles/6371081.html

以上是关于Kafka-Connect实践的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-connect 是不是必须使用模式注册表?

使用本地 kafka-connect 集群连接远程数据库的连接超时

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题

MySql 查询在 Kafka-connect 中失败

逆向及Bof基础实践

没有模式注册表的 Kafka-connect