Kafka Connect的安装和配置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Connect的安装和配置相关的知识,希望对你有一定的参考价值。

参考技术A

       在使用Kafka Connect时,需要注意一些事项,以帮助你构建适应长期需求的datapipeline。本章旨在提供有关的一些上下文。

       要开始使用Kafka Connect,只有一个硬性的先决条件:一个Kafka的broker集群。然而,随着集群增长,有几个问题需要提前考虑:

       在开始之前,确定哪种模式最适合您的环境非常有用。 对于适合单个代理的环境(例如从web服务器向Kafka发送日志),standalone模式非常适合。在单个source或sink可能需要大量数据的用例中(例如,将数据从Kafka发送到HDFS),分布式模式在可伸缩性方面更加灵活,并提供了高可用性服务,从而最小化停机时间。

       Kafka Connect插件是一组jar文件,Kafka Connect可以在其中找到一个或多个connector、transform、以及converter的实现。Kafka Connect将每个插件彼此隔离,这样一个插件中的库就不会受到其他插件库的影响,这点非常重要。

Kafka Connect plugin是:
(1)在一个uber jar文件中包含插件及所有第三方依赖;或
(2)一个包含jar包和第三方依赖的目录。

       Kafka Connect使用plugin path找到插件,这是Kafka Connect在worker配置文件中定义的一个以逗号分隔的目录列表。要安装插件,请将目录或uber jar放在plugin path路径中列出的目录中。

        举个例子 ,我们在每台机器上创建一个/usr/local/share/kafka/plugins目录,然后将我们所有的插件jar或插件目录放入其中。然后在worker的配置文件中加入如下配置项:

       现在,当我们启动worker时,Kafka Connect可以发现这些插件中定义的所有connector、transform以及converter。Kafka Connect显式地避免了其他插件中的库, 并防止了冲突。

       如果要在同一个机器上运行多个standalone实例,有一些参数需要是独一无二的:
(1)offset.storage.file.filename:connector偏移量的存储。
(2)rest.port:用于监听http请求的rest接口所占用的端口。

       connector和task的配置,offsets和状态会存储在Kafka的内部主题中,Kafka Connect会自动创建这些主题,且所有topic都使用了压缩清理策略。
       如果要手动创建这些topic,推荐使用如下命令:

这里只列出一些有疑问的。

       配置了group.id的worker会自动发现彼此并形成集群。一个集群中的所有worker必须使用相同的三个Kafka topic来共享配置、偏移量以及状态,所有worker必须配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。

       每个converter实现类都有自己的相关配置需求。下面的例子展示了一个worker属性文件,其中使用的AvroConverter需要将Schema Registry的url作为属性进行传递。

注意: 除了其配置覆盖这些配置的connector,worker上运行的所有connector都使用这些converter。

Kafka Connect:获取连接器配置

【中文标题】Kafka Connect:获取连接器配置【英文标题】:Kafka Connect: Getting connector configuration 【发布时间】:2021-06-03 20:10:00 【问题描述】:

我一直在用 kafka connect 进行测试。但是对于每个连接器,我必须阅读连接器文档以了解连接器所需的配置。就我阅读 kafka 连接 API 文档而言,我已经看到了 API 以获取连接器相关数据。

GET /connector-plugins - 返回安装在 Kafka Connect 集群中的连接器插件列表。请注意,API 仅检查处理请求的工作器上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加了新的连接器 jar。

PUT /connector-plugins/connector-type/config/validate - 根据配置定义验证提供的配置值。此 API 执行每个配置验证,在验证期间返回建议值和错误消息。

其他 API 与创建的连接器相关。有没有办法获得所需连接器的配置?

【问题讨论】:

【参考方案1】:

是否可以获取所需连接器的配置

验证端点正是这样做的,并且是 Landoop Kafka Connect UI 用来为缺少/错误配置的属性提供错误的地方。

如何需要属性的实现细节取决于连接器配置的Importance 级别,对于任何非高重要性配置,最好参考文档或源代码(如果有)

【讨论】:

我确实发现了这一点。不过感谢您的回答!

以上是关于Kafka Connect的安装和配置的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:获取连接器配置

Kafka安装和常用操作命令

Kafka 入门--安装配置和 kafka-python 调用

Kafka 安装配置 windows 下

Kafka集群安装和配置

Kafka集群安装和配置