使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka

Posted

技术标签:

【中文标题】使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka【英文标题】:Send Record with JSON Schema to Kafka using Spring-Kafka and Confluent schema registry 【发布时间】:2021-12-12 12:44:31 【问题描述】:

我在互联网上找不到任何关于如何使用 spring kafka 将带有 json 模式的记录发送到 kafka 的信息。我该怎么做?

【问题讨论】:

【参考方案1】:

花了几个小时后,我发现可以通过 3 种不同的方式发送带有 json 模式的记录。相关部分在io.confluent.kafka.schemaregistry.json.JsonSchemaUtils中实现

这里是摘录:

if (isEnvelope(object)) 
  return getSchemaFromEnvelope((JsonNode) object);

Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) 
  Schema schema = (Schema) cls.getAnnotation(Schema.class);
  List<SchemaReference> references = Arrays.stream(schema.refs())
          .map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
          .collect(Collectors.toList());
  if (client == null) 
    if (!references.isEmpty()) 
      throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
              + " with refs " + references);
    
    return new JsonSchema(schema.value());
   else 
    return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
            .orElseThrow(() -> new IOException("Invalid schema " + schema.value()
                    + " with refs " + references));
  

JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) 
  case DRAFT_4:
    draft = JsonSchemaDraft.DRAFT_04;
    break;
  case DRAFT_6:
    draft = JsonSchemaDraft.DRAFT_06;
    break;
  case DRAFT_7:
    draft = JsonSchemaDraft.DRAFT_07;
    break;
  case DRAFT_2019_09:
    draft = JsonSchemaDraft.DRAFT_2019_09;
    break;
  default:
    draft = JsonSchemaDraft.DRAFT_07;
    break;

config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);

所以你有 3 种可能性:

    创建一个包含架构和有效负载字段的 JsonNode 使用 @Schema 注释您的类 不提供模式,让模式生成器生成

我选择了 1) 使用以下代码:

public class MyKafkaTemplate 

   private static final String SCHEMA_POSTFIX_KEY = "-key.json";
   private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
   private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
   private final ObjectMapper objectMapper;
   private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();

   public <K, V> void send(final String topic, final K key, final V value) 
       final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
       final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
       kafkaTemplate.send(topic, keyNode, valueNode);
   

   private JsonNode getEnvelope(final String schemaFilePath, final Object key) 
       final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
       final JsonNode payload = objectMapper.valueToTree(key);
       return JsonSchemaUtils.envelope(schemaNode, payload);
   

   private JsonNode getOrLoadSchema(final String schemaFilePath) 
       return topicSchemaCache.computeIfAbsent(schemaFilePath, key -> 
        readFileToJsonNode(schemaFilePath));
   

   @SneakyThrows
   private JsonNode readFileToJsonNode(final String schemaFilePath) 
       return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
   


【讨论】:

以上是关于使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性

使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka

spring-kafka 固定事件迁移实现

LocalDateTime 的自定义 spring-kafka 反序列化器

Spring-Kafka.RecordIntercepter类未找到。找不到RecordIntercepter类

使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器