Kafka中使用Avro编码解码消息

Posted darange

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka中使用Avro编码解码消息相关的知识,希望对你有一定的参考价值。

1.消费者代码

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by p on 2018/10/8.
 */
public class AvroKafkaProducer {
    public static final String USER_SCHEMA = "{
" +
            "    "type":"record",
" +
            "    "name":"Customer",
" +
            "    "fields":[
" +
            "        {"name":"id","type":"int"},
" +
            "        {"name":"name","type":"string"},
" +
            "        {"name":"email","type":["null","string"],"default":"null"}
" +
            "    ]
" +
            "}";


    public static void main(String[] args){

        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers","ip:9092");
        kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProps.put("partitioner.class","MyPartitioner");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);

        Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema);
        KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps);
        for(int i = 0;i < 1000;i++){
            GenericData.Record record = new GenericData.Record(schema);
            record.put("id",i);
            record.put("name","name-"+i);
            record.put("email","email-"+i);
            byte[] bytes = injection.apply(record);
            ProducerRecord<String,byte[]> record1 = new ProducerRecord<String, byte[]>("Customer","customer-"+i,bytes);
            producer.send(record1);
        }
        producer.close();
        System.out.println(USER_SCHEMA);
    }
}

2. 消费者代码

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by p on 2018/10/14.
 */
public class AvroKafkaConsumer {

    public static final String USER_SCHEMA = "{
" +
            "    "type":"record",
" +
            "    "name":"Customer",
" +
            "    "fields":[
" +
            "        {"name":"id","type":"int"},
" +
            "        {"name":"name","type":"string"},
" +
            "        {"name":"email","type":["null","string"],"default":"null"}
" +
            "    ]
" +
            "}";

    public static void main(String[] args){
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers","ip:9092");

        kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");

        kafkaProps.put("group.id","DemoAvroKafkaConsumer");

        kafkaProps.put("auto.offset.reset","earliest");

        KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps);

        consumer.subscribe(Collections.singletonList("Customer"));

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);

        Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true){
                ConsumerRecords<String,byte[]> records = consumer.poll(10);
                for(ConsumerRecord<String,byte[]> record : records){
                    GenericRecord record1 = injection.invert(record.value()).get();
                    System.out.println(record.key() + ":" + record1.get("id") + "	" + record1.get("name") + "	" + record1.get("email"));
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3. pom依赖

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.6-cdh5.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.11</artifactId>
            <version>0.9.6</version>
        </dependency>

 

以上是关于Kafka中使用Avro编码解码消息的主要内容,如果未能解决你的问题,请参考以下文章

Python AVRO阅读器在解码kafka消息时返回AssertionError

在Kafka中使用Avro编码消息:Producter篇

如何使用来自 Kafka 的 Python 解码/反序列化 Avro

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

无法读取 Kafka 主题 avro 消息

我可以获得使用 avro kafka 消息的示例代码吗?