将任意 JSON 字符串转换为 Kafka Schema

Posted

技术标签:

【中文标题】将任意 JSON 字符串转换为 Kafka Schema【英文标题】:Converting an arbitrary JSON string to Kafka Schema 【发布时间】:2021-12-01 10:58:06 【问题描述】:

我正在成功使用 Kafka Connect,但我有一个 JSON 字符串,我试图以一种通用/一致的方式将其转换为 Kafka Schema。有没有办法将任意 JSON 字符串转换为 Kafka Connect 可以使用的“SourceRecord”。我怀疑有一种简单的方法可以做到这一点,但到目前为止,我还没有找到一个很好/有效的例子来做我正在寻找的。​​p>

现在我可以根据以下示例成功地逐个转换 JSON 字符串。显示的示例很简单,但我希望有一种方法可以对任意复杂的 JSON 字符串执行此操作并创建 SourceRecord 所需的模式输出(键和值)?

(注意:这是在 Kafka Connect 插件“poll()”方法中完成的)

:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;

records.add(new SourceRecord(
                    sourcePartition, sourceOffset, topic, partition,
                    keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) 
    return new Struct(HttpSourceSchemas.KEY_SCHEMA)
                .put(HttpSourceSchemas.ID_FIELD, id);


private Struct buildValue(String timestamp, String data) 
    return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
                .put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
                .put(HttpSourceSchemas.DATA_FIELD, data);

:
:

我的架构如下所示:

public final class HttpSourceSchemas 

    private HttpSourceSchemas() 
        // Empty
    

    public static final String ID_FIELD = "id";
    public static final String TIMESTAMP_FIELD = "timestamp";
    public static final String DATA_FIELD = "data";

    public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
        .name("Key Schema")
        .version(1)
        .field(ID_FIELD, Schema.INT64_SCHEMA)
        .build();

    public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
        .field(DATA_FIELD, Schema.STRING_SCHEMA)
        .build();

【问题讨论】:

理想情况下,您不应在 Connect API 中使用特定的序列化格式。这是连接器任务代码外部的转换器类的目的。坚持使用 Struct/Schema 对象(在解析来自其他地方的数据之后......不过,Kafka 带有 Jackson,所以使用它) 【参考方案1】:
    选择一个 JSON 处理库,例如 jackson(推荐,因为它包含在 connect-api 依赖项中)或 gson 解析任意字符串并在 Java 中获取 JSON 对象。 执行深度优先搜索以遍历 JSON 对象的节点。 构建其 Kafka 架构。

Kafka 的JsonConverter 中的一些代码应该对您有所帮助。

【讨论】:

以上是关于将任意 JSON 字符串转换为 Kafka Schema的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Kafka Connect:将消息从字节转换为 Json

Spark 将 JSON 字符串转换为 JSON 对象(Java)

将 Spark 数据集转换为 JSON 并写入 Kafka Producer

Apache Kafka/NiFi 可以将数据转换为 JSON 文件吗?

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON