使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka
Posted
技术标签:
【中文标题】使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka【英文标题】:Send Record with JSON Schema to Kafka using Spring-Kafka and Confluent schema registry 【发布时间】:2021-12-12 12:44:31 【问题描述】:我在互联网上找不到任何关于如何使用 spring kafka 将带有 json 模式的记录发送到 kafka 的信息。我该怎么做?
【问题讨论】:
【参考方案1】:花了几个小时后,我发现可以通过 3 种不同的方式发送带有 json 模式的记录。相关部分在io.confluent.kafka.schemaregistry.json.JsonSchemaUtils中实现
这里是摘录:
if (isEnvelope(object))
return getSchemaFromEnvelope((JsonNode) object);
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class))
Schema schema = (Schema) cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
.collect(Collectors.toList());
if (client == null)
if (!references.isEmpty())
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
return new JsonSchema(schema.value());
else
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(() -> new IOException("Invalid schema " + schema.value()
+ " with refs " + references));
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion)
case DRAFT_4:
draft = JsonSchemaDraft.DRAFT_04;
break;
case DRAFT_6:
draft = JsonSchemaDraft.DRAFT_06;
break;
case DRAFT_7:
draft = JsonSchemaDraft.DRAFT_07;
break;
case DRAFT_2019_09:
draft = JsonSchemaDraft.DRAFT_2019_09;
break;
default:
draft = JsonSchemaDraft.DRAFT_07;
break;
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);
所以你有 3 种可能性:
-
创建一个包含架构和有效负载字段的 JsonNode
使用 @Schema 注释您的类
不提供模式,让模式生成器生成
我选择了 1) 使用以下代码:
public class MyKafkaTemplate
private static final String SCHEMA_POSTFIX_KEY = "-key.json";
private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
public <K, V> void send(final String topic, final K key, final V value)
final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
kafkaTemplate.send(topic, keyNode, valueNode);
private JsonNode getEnvelope(final String schemaFilePath, final Object key)
final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
final JsonNode payload = objectMapper.valueToTree(key);
return JsonSchemaUtils.envelope(schemaNode, payload);
private JsonNode getOrLoadSchema(final String schemaFilePath)
return topicSchemaCache.computeIfAbsent(schemaFilePath, key ->
readFileToJsonNode(schemaFilePath));
@SneakyThrows
private JsonNode readFileToJsonNode(final String schemaFilePath)
return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
【讨论】:
以上是关于使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性
使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka
LocalDateTime 的自定义 spring-kafka 反序列化器