使用Kafka Streams处理复杂的Avro消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Kafka Streams处理复杂的Avro消息相关的知识,希望对你有一定的参考价值。

我正在Kafka Streams上进行POC,我正在使用Kafka Streams处理avro消息。事实是我的Avro消息混合了简单和复杂的类型,所以我发现它很难处理它。

我的Avro Schema如下所示。

{"type":"record",
"namespace": "com.test",
"name": "backoffice",
"fields": [ {"name": "accountid","type": "string"},
{"name":"amendmentpositionid","type": "int"},
{"name":"booking","type":
{"type":"array","items":
{"namespace":"com.saxo",
"name":"bookingfields",
"type":"record",
"fields":
[{"name":"accountid","type":"string"},{"name":"clientid","type":"int"},
{"name":"clientname","type":"string"},{"name":"exerciseid","type":"int"},
{"name":"hedgeid","type":"int"},{"name":"originatingpositionid","type":"int"},
{"name":"positionid","type":"int"},{"name":"relatedpositionid","type":"int"} ]}}}]}

输入数据如下所述

{"accountid":"1234","amendmentpositionid":1234,"booking":[{"accountid":"898","clientid":333,"clientname":"Non ","exerciseid":2,"hedgeid":100

在将其存储到数据库之前,我需要将它展平并看起来如下所述。

1234,1234,898,333,NON,2,100

为了实现这一点,我试图使用Kafka Streams flatmapvalues操作但不知何故我无法在我的最终输出中保留id和date。

我的kafka Streams应用程序如下所示。

package com.test.office.KafkaStreams;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
//import io.confluent.kafka;
//import org.apache.kafka.common.serialization.Serdes.
//import io.confluent.kafka.serialiszers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import com.saxo.backoffice;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import org.apache.kafka.streams.kstream.KeyValueMapper;

import javax.swing.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class KafkaAvroSchemaRegistry {


    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "Kafka Avro Topic 8");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "server1");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://server2:8081");

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, testSpecific> testspecific1 = builder.stream("topic10");

        KStream<String,testspecific> output1 = testspecific1.peek((key,value) -> System.out.println(key + value.toString()));


output1.print();
KStream<String,String> test = testspecific1.flatMapValues(value -> value.Booking());

test.print()
        KafkaStreams streams = new KafkaStreams(builder, properties);

        streams.cleanUp();
        streams.start();


        // print the topology
       // System.out.println(streams.toString());

        // shutdown hook to correctly close the streams application
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

有人能指出我正确的方向吗?

答案

不知怎的,我无法在最终输出中保留id和日期。

因为你只是映射到getMessage()

尝试获取所有字段

flatMapValues(value -> 
    String.format("%d,%s,%s", value.getId(), value.getDate(), value.getMessage());

以上是关于使用Kafka Streams处理复杂的Avro消息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams - SerializationException:未知的魔术字节

使用 Kafka 的 Streams API 处理错误消息

如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?

Kafka 的嵌套 Avro 类型是不是有最佳实践?

Kafka streams概览

[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警