Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录
Posted
技术标签:
【中文标题】Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录【英文标题】:Kafka connect setup to send record from Aurora using AWS MSK 【发布时间】:2020-04-20 10:15:21 【问题描述】:我必须将记录从 Aurora/mysql 发送到 MSK,然后从那里发送到 Elastic 搜索服务
Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->弹性搜索
Aurora 表结构中的记录是这样的 我认为记录将以这种格式进入 AWS MSK。
"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""
所以为了通过弹性搜索消费,我需要使用正确的模式,所以我必须使用模式注册表。
我的问题
问题 1
对于上述类型的消息模式注册表,我应该如何使用模式注册表? 我是否必须为此创建 JSON 结构,如果是,我将其保留在哪里。 此处需要更多帮助才能理解这一点?
我已经编辑了
vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
提到了 zookeper,但我不知道 kafkastore.topic=_schema
是什么
如何将此链接到自定义架构。
即使我开始并得到这个错误
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.
这是我所期待的,因为我没有对架构做任何事情。
我确实安装了 jdbc 连接器,但当我启动时出现以下错误
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/connectorType/config/validate`
问题 2 我可以在一个 ec2 上创建两个连接器(jdbc 和弹性 serach 一个)。如果是,我必须在单独的 cli 中同时启动吗?
问题 3 当我打开 vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 我只看到下面的属性值
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-
在上面的属性文件中我可以提到架构名和表名吗?
根据回答,我正在更新我的 Kafka 连接 JDBC 配置
---------------启动 JDBC 连接弹性搜索 --------------- --
wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
tar -xzf mysql-connector-java-5.1.48.tar.gz
sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc
然后
vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
然后我修改了下面的属性
connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
mode=incrementing
connection.user=admin
connection.password=Welcome123
table.whitelist=PANStatementInstanceLog
schema.pattern=dbo
上次修改
vim /usr/local/confluent/etc/kafka/connect-standalone.properties
这里我修改了下面的属性
bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java
当我列出主题时,我没有看到为表名列出的任何主题。
错误消息的堆栈跟踪
[2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/connectorType/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/connectorType/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '"name": "emp-connector", "config": "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" '
【问题讨论】:
一个问题:数据是如何进入 Aurora的? @cricket_007 我正在使用 DMS 将数据放入极光 .. 那么,来自另一个数据库?为什么需要 Aurora,那么如果可以从该数据库设置 kafka 连接?或者数据如何到达?为什么不能在记录源所在的地方替换/添加 Kafka 生产者? @cricket_007 我正在将数据从本地数据库推送到 Aurora。因此 DMS 用于此目的。而且此设置完全在云上。 我仍然不确定我是否了解 DMS 的用途。这比在本地使用 Debezium 将数据提取到 MSK Kafka 更便宜吗?在您的应用程序完全在云中之前,您需要继续运行 DMS,对吗?那么,为什么不重构您的应用程序以直接写入 Kafka,然后使用 JDBC sink 和 Aurora 以及 Elastic sink 将数据放在这两个地方 【参考方案1】:需要架构注册表?
没有。您可以在 json 记录中启用模式。 JDBC 源可以根据表信息为您创建
value.converter=org.apache.kafka...JsonConverter
value.converter.schemas.enable=true
提到了 zookeper,但我不知道什么是 kafkastore.topic=_schema
如果你想使用 Schema Registry,你应该使用 kafkastore.bootstrap.servers
.with Kafka 地址,而不是 Zookeeper。所以删除kafkastore.connection.url
请read the docs了解所有属性
我没有对架构做任何事情。
没关系。注册表首次启动时会创建架构主题
我可以在一个 ec2 上创建两个连接器
是(忽略可用的 JVM 堆空间)。同样,这在 Kafka Connect 文档中有详细说明。
使用独立模式,您首先传递连接工作器配置,然后在一个命令中最多传递 N 个连接器属性
使用分布式模式,您使用 Kafka Connect REST API
https://docs.confluent.io/current/connect/managing/configuring.html
当我打开 vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
首先,这是针对 Sqlite,而不是针对 Mysql/Postgres。您不需要使用快速入门文件,它们仅供参考
同样,所有属性都有很好的文档记录
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
我确实安装了 jdbc 连接器,但当我启动时出现以下错误
这里有更多关于如何调试的信息
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
如前所述,我个人建议尽可能使用 Debezium/CDC
Debezium Connector for RDS Aurora
【讨论】:
但是你可以看到我的记录不是 JSON。所以即使那样我也不需要模式注册表? 嗯?如果将 JDBC 源与 JSON 转换器一起使用,则 JDBC 记录 变为 JSON。他们在使用 AvroConverter 时变成 Avro。只有 AvroConverter 需要 Schema Registry 设置...我不确定我是否可以更清楚 在新的 EC2 中设置完之后。我遇到了错误Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
哪个组件报告了这一点?弹性搜索接收器?您没有使用我在回答 jdbc 源时提到的属性吗?为什么需要新的 EC2?您只是在编辑属性文件和远程 kafka 主题
不过,此时,您的数据库配置需要以某种方式修复。这不再是卡夫卡的问题了。就像我说的,请搜索你的错误***.com/questions/2983248/…【参考方案2】:
我猜您打算使用 AVRO 来传输数据,因此在启动 Kafka Connect 工作程序时不要忘记将 AVROConverter 指定为默认转换器。如果您将使用 JSON,则不需要 Schema Registry。
1.1kafkastore.topic=_schema
您是否启动了自己的架构注册表?当您启动模式注册表时,您必须指定“模式”主题。基本上,Schema Registry 将使用这个主题来存储它注册的模式,如果发生故障,它可以从那里恢复它们。
1.2jdbc connector installed and when i start i get below error
默认情况下,JDBC 连接器仅适用于 SQLite 和 PostgreSQL。如果您希望它与 MySQL 数据库一起使用,那么您也应该将 MySQL Driver 添加到类路径中。
2.这取决于您如何部署 Kafka Connect 工作人员。如果您选择分布式模式(推荐),那么您实际上不需要单独的 CLI。您可以通过 Kafka Connect REST API 部署连接器。
3.还有另一个名为table.whitelist
的属性,您可以在其上指定您的模式和表。例如:table.whitelistusers,products,transactions
【讨论】:
MySQL 驱动到类路径的意思是plugin.path=/usr/local/confluent/share/java
?
没错!使用plugin.path
属性定义Kafka Connect 可以从中加载外部依赖项(JAR)的路径。在这里您可以添加其他连接器和这些连接器的依赖项(例如:驱动程序、记录器、API 客户端等)
JDBC 驱动程序不作为插件被拉取,只有连接器、转换、转换器等
我没有得到任何需要放置 kafka 主题名称的属性..我也更新了我的问题..请看一下
@SUDARSHAN:这是因为主题遵循命名约定。考虑到 JDBC 源连接器,那么主题将与表具有相同的名称。您还可以添加 topic.prefix
属性,它将为表名添加前缀。例如:“topic.prefix:mysql-”并且表被称为“users”,那么主题将被称为“mysql-users”。 @cricket_007:我不确定我是否正确理解了您的评论,请您详细说明一下吗?以上是关于Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录的主要内容,如果未能解决你的问题,请参考以下文章
AWS Kafka (MSK) - 如何生成 Keystore 和 truststore 并在我的 Spring Cloud Stream 应用程序中使用它们?