Kafka中使用Avro编码解码消息
Posted darange
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka中使用Avro编码解码消息相关的知识,希望对你有一定的参考价值。
1.消费者代码
import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by p on 2018/10/8. */ public class AvroKafkaProducer { public static final String USER_SCHEMA = "{ " + " "type":"record", " + " "name":"Customer", " + " "fields":[ " + " {"name":"id","type":"int"}, " + " {"name":"name","type":"string"}, " + " {"name":"email","type":["null","string"],"default":"null"} " + " ] " + "}"; public static void main(String[] args){ Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","ip:9092"); kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer"); kafkaProps.put("partitioner.class","MyPartitioner"); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema); KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps); for(int i = 0;i < 1000;i++){ GenericData.Record record = new GenericData.Record(schema); record.put("id",i); record.put("name","name-"+i); record.put("email","email-"+i); byte[] bytes = injection.apply(record); ProducerRecord<String,byte[]> record1 = new ProducerRecord<String, byte[]>("Customer","customer-"+i,bytes); producer.send(record1); } producer.close(); System.out.println(USER_SCHEMA); } }
2. 消费者代码
import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * Created by p on 2018/10/14. */ public class AvroKafkaConsumer { public static final String USER_SCHEMA = "{ " + " "type":"record", " + " "name":"Customer", " + " "fields":[ " + " {"name":"id","type":"int"}, " + " {"name":"name","type":"string"}, " + " {"name":"email","type":["null","string"],"default":"null"} " + " ] " + "}"; public static void main(String[] args){ Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","ip:9092"); kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); kafkaProps.put("group.id","DemoAvroKafkaConsumer"); kafkaProps.put("auto.offset.reset","earliest"); KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps); consumer.subscribe(Collections.singletonList("Customer")); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema); try { while (true){ ConsumerRecords<String,byte[]> records = consumer.poll(10); for(ConsumerRecord<String,byte[]> record : records){ GenericRecord record1 = injection.invert(record.value()).get(); System.out.println(record.key() + ":" + record1.get("id") + " " + record1.get("name") + " " + record1.get("email")); } } } finally { consumer.close(); } } }
3. pom依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.6-cdh5.9.1</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.6</version> </dependency>
以上是关于Kafka中使用Avro编码解码消息的主要内容,如果未能解决你的问题,请参考以下文章
Python AVRO阅读器在解码kafka消息时返回AssertionError
如何使用来自 Kafka 的 Python 解码/反序列化 Avro
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码