将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小

Posted

技术标签:

【中文标题】将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小【英文标题】:Flush size when using kafka-connect-transform-archive with HdfsSinkConnector 【发布时间】:2019-09-15 20:01:28 【问题描述】:

我有一个 Kafka 主题中的数据,我想保留在我的数据湖中。

在担心密钥之前,我能够使用 HdfsSinkConnector 将 Avro 值保存在数据湖上的文件中。每个文件中消息值的数量由 HdfsSinkConnector 的“flush.size”属性决定。

一切都好。接下来我也想保留密钥。为此,我使用了 kafka-connect-transform-archive,它将字符串键和 Avro 值包装到一个新的 Avro 模式中。

这很好用……除了 HdfsSinkConnector 的 flush.size 现在被忽略了。保存在数据湖中的每个文件只有 1 条消息。

因此,这两种情况是 1) 仅保存值,每个文件中的值的数量由 flush.size 确定;2) 保存键和值,每个文件只包含一条消息,而 flush.size 被忽略。

这两种情况的唯一区别是 HdfsSinkConnector 的配置,它指定了存档转换。

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

kafka-connect-transform-archive 是否在设计上忽略了刷新大小,还是我需要一些额外的配置才能在数据湖上为每个文件保存多个键值消息?

【问题讨论】:

转换发生在每条消息到达 HDFS Connect 之前,因此没有属性被“忽略”。我注意到当主题中同时生成不同的模式版本时,文件会被刷新。那么可能发生的情况是 HDFS connect 认为它想要写入“just value”模式,然后获取“key+value”的记录,然后清除缓冲区并立即写入文件... 感谢您的回复,但请您再解释一下,这让我感到困惑(对不起)。也许我误解了(总是可能的),但是您的第二句话是说我们可能正在为该主题编写不同的模式(所以在我们开始使用 Kafka Connect 之前出了点问题),第三句话似乎暗示这是一个纯粹的HDFS 连接问题与想要编写键值模式相关,而它只需要值模式。我这里有什么问题? 我想你理解得很好。我只使用 S3 Connect 测试了存档转换。我没有调查它多久冲洗一次。但我们最终没有使用它,只是在生产过程中将密钥(对我们来说主要是字符串)直接复制到消息值中 你会说目前没有可用的解决方案来做我想做的事吗? 我不确定,抱歉。它需要有人调试 HDFS 连接器的源代码,以了解它为什么一次只写入一条记录 【参考方案1】:

我在使用 kafka gcs sink 连接器时遇到了同样的问题。

在 com.github.jcustenborder.kafka.connect.archive.Archive 代码中,每条消息都会创建一个新的 Schema。

private R applyWithSchema(R r) 
final Schema schema = SchemaBuilder.struct()
    .name("com.github.jcustenborder.kafka.connect.archive.Storage")
    .field("key", r.keySchema())
    .field("value", r.valueSchema())
    .field("topic", Schema.STRING_SCHEMA)
    .field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
    .put("key", r.key())
    .put("value", r.value())
    .put("topic", r.topic())
    .put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

如果您查看 kafka 转换 InsertField$Value 方法,您会发现它使用 SynchronizedCache 以便每次都检索相同的架构。

https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L170

因此,您只需要创建一个模式(在 apply 函数之外)或使用相同的 SynchronizedCache 代码。

【讨论】:

感谢您的回答。当时我们通过重新格式化输入数据来解决这个问题,以便将键包含在值中。这样,就不需要转换了。 :man_shrugging_emoji:

以上是关于将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小的主要内容,如果未能解决你的问题,请参考以下文章

如何将Ios文件上传到

Javascript 将正则表达式 \\n 替换为 \n,将 \\t 替换为 \t,将 \\r 替换为 \r 等等

如何将视频文件转换格式

sh 一个将生成CA的脚本,将CA导入到钥匙串中,然后它将创建一个证书并与CA签名,然后将其导入到

python怎么将0写入文件?

如何将CMD窗口背景改成透明?