将任意 JSON 字符串转换为 Kafka Schema
Posted
技术标签:
【中文标题】将任意 JSON 字符串转换为 Kafka Schema【英文标题】:Converting an arbitrary JSON string to Kafka Schema 【发布时间】:2021-12-01 10:58:06 【问题描述】:我正在成功使用 Kafka Connect,但我有一个 JSON 字符串,我试图以一种通用/一致的方式将其转换为 Kafka Schema。有没有办法将任意 JSON 字符串转换为 Kafka Connect 可以使用的“SourceRecord”。我怀疑有一种简单的方法可以做到这一点,但到目前为止,我还没有找到一个很好/有效的例子来做我正在寻找的。p>
现在我可以根据以下示例成功地逐个转换 JSON 字符串。显示的示例很简单,但我希望有一种方法可以对任意复杂的 JSON 字符串执行此操作并创建 SourceRecord 所需的模式输出(键和值)?
(注意:这是在 Kafka Connect 插件“poll()”方法中完成的)
:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;
records.add(new SourceRecord(
sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id)
return new Struct(HttpSourceSchemas.KEY_SCHEMA)
.put(HttpSourceSchemas.ID_FIELD, id);
private Struct buildValue(String timestamp, String data)
return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
.put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
.put(HttpSourceSchemas.DATA_FIELD, data);
:
:
我的架构如下所示:
public final class HttpSourceSchemas
private HttpSourceSchemas()
// Empty
public static final String ID_FIELD = "id";
public static final String TIMESTAMP_FIELD = "timestamp";
public static final String DATA_FIELD = "data";
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("Key Schema")
.version(1)
.field(ID_FIELD, Schema.INT64_SCHEMA)
.build();
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name("Value Schema")
.version(1)
.field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
.field(DATA_FIELD, Schema.STRING_SCHEMA)
.build();
【问题讨论】:
理想情况下,您不应在 Connect API 中使用特定的序列化格式。这是连接器任务代码外部的转换器类的目的。坚持使用 Struct/Schema 对象(在解析来自其他地方的数据之后......不过,Kafka 带有 Jackson,所以使用它) 【参考方案1】:-
选择一个 JSON 处理库,例如 jackson(推荐,因为它包含在
connect-api
依赖项中)或 gson
解析任意字符串并在 Java 中获取 JSON 对象。
执行深度优先搜索以遍历 JSON 对象的节点。
构建其 Kafka 架构。
Kafka 的JsonConverter 中的一些代码应该对您有所帮助。
【讨论】:
以上是关于将任意 JSON 字符串转换为 Kafka Schema的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON
Spark 将 JSON 字符串转换为 JSON 对象(Java)
将 Spark 数据集转换为 JSON 并写入 Kafka Producer