Kafka connect spooldir 动态模式生成器

Posted

技术标签:

【中文标题】Kafka connect spooldir 动态模式生成器【英文标题】:Kafka connect spooldir Dynamic schema generator 【发布时间】:2020-08-28 23:17:31 【问题描述】:

这是关于 CSV 的 kafka-connect-spooldir 连接器。我想知道是否有办法避免对模式进行硬编码并让连接器动态创建模式?我有很多 csv 文件要处理,比如说每天几百 GB,有时是几个 tera 字节的 csv。有时,一些 csv 文件有新列,而一些被删除。

我能够成功读取 csv 并写入弹性搜索,并且我关注了您的帖子。https://www.confluent.io/blog/ksql-in-action-enriching-csv-events-with-data-from-rdbms-into-AWS/ 所以现在我不想使用值模式和键模式。

来自链接https://docs.confluent.io/current/connect/kafka-connect-spooldir/connectors/csv_source_connector.html;我认为 schema.generation.enabled 可以设置为 true。

这是我的 REST API 调用 [包括我的连接器配置]

$curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://xxx:000/connectors/ -d '
"name":"csv1",
"config":
"tasks.max":"1",
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.file.pattern":"^.*csv$",
"halt.on.error":"false",
"topic":"order",
"schema.generation.enabled":"true",
"schema.generation.key.name":"orderschema",
"schema.generation.value.name":"orderdata",
"csv.first.row.as.header":"true",
"csv.null.field.indicator":"EMPTY_SEPARATORS",
"batch.size" : "5000",
   

'

当我提交这个时,我收到以下错误。 “名称”:“订单”, “连接器”: “状态”:“失败”, "worker_id": "localhost:000", "trace": "org.apache.kafka.connect.errors.DataException: 为输入模式找到了多个模式。\nSchema: \"name\":\"com.github .jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":

解决办法是什么?

【问题讨论】:

【参考方案1】:

我现在能够解析所有数据。诀窍是首先处理一个文件[任何],然后只是检查添加随机添加另一个。看起来像那样,它会自动更新架构。 (就像 Robin Moffatt 所说的那样) 之后将所有文件添加到文件夹中,它处理得很好。耶!

【讨论】:

以上是关于Kafka connect spooldir 动态模式生成器的主要内容,如果未能解决你的问题,请参考以下文章

flume Source志SpoolDir

[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)

kafka复习

Kafka 连接器的动态创建

扩展 Kafka Connect 以处理 10K S3 存储桶

flume与kafka结合上传文件到HDFS上