Kafka 消息的序列化与反序列化
Posted benfly
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 消息的序列化与反序列化相关的知识,希望对你有一定的参考价值。
自定义反序列化类:
对于自定义的avro schema结构,需要有自定义的类在consumer时反序列化,反序列化类实例在consumer构造的时候通过参数传入
public class AvroWithSchemaSpecificDeser<T,E> implements Deserializer<T> { private Class<T> typeClass; private transient Schema schema; private String codecName; /** * Simple constructor * * @param pojoClassName The pojo class name to be deserialized * @param codecName The codec used for compression, if null, no compression is applied */ public AvroWithSchemaSpecificDeser(final String pojoClassName, final String codecName) { try { Class<T> payloadClassType = (Class<T>) Class.forName(pojoClassName); typeClass = payloadClassType; schema = (Schema) payloadClassType.getField("SCHEMA$").get(null); this.codecName = codecName != null ? codecName : "null"; } catch (AvroRuntimeException ex) { throw new IllegalStateException(String.format("Not able to initialize avro object. Details: %s", ex.getMessage()), ex); } } @Override public T deserialize(String topic, byte[] data) { T pojoObject= null; if(data != null && data.length > 0) { DatumReader<T> datumReader = null; DataFileReader<T> dataFileReader = null; try { SpecificData specificData = new SpecificData(); //用于日期和时间格式的转换 specificData.addLogicalTypeConversion(new DateConversion()); specificData.addLogicalTypeConversion(new TimeConversion()); specificData.addLogicalTypeConversion(new TimestampConversion()); pojoObject = typeClass.newInstance(); datumReader = new SpecificDatumReader<>(null, schema, specificData); dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data), datumReader); while (dataFileReader.hasNext()) { pojoObject = dataFileReader.next(pojoObject); } } catch(Exception ex) { SerializationException serex = new SerializationException(String.format("Error when deserializing byte[] to this class (%s) from this topic (%s)",typeClass.toString(), topic), ex); } finally { if(dataFileReader != null) { dataFileReader.close(); } } } return pojoObject; } }
创建consumer对象:
首先在RunnableConsumer中需要创建kafka consumer实例,需要传入consumer的属性列表及反序列化对象,在下面创建反序列化实例时,只传入了pojo_class_name,codec使用了null,也就是没有使用任何压缩编码
Deserializer<K> keyDeserClass = (Deserializer) Class.forName(props.getProperty("key.deserializer")).newInstance(); Class<?> cl = Class.forName(props.getProperty("value.deserializer")); Constructor<?> cons = cl.getConstructor(Map.class); Deserializer<V> valueSerClass = (Deserializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null); consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);
consumer的props属性从配置服务器中读取,其值为类似以下的k-v,其中关键的字段为bootstrap.servers,key.deserializer,value.deserializer,group.id和需要反序列化的pojo_class_name
{ security.protocol=SASL_PLAINTEXT, schema.registry.url=http://yourregistryurl.youcompany.com:8080, bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788, key.deserializer=org.apache.kafka.common.serialization.LongDeserializer, value.deserializer=com.youcompany.serialization.AvroSchemaSpecificDeser, [email protected], group.id=yourgroupid,
pojo_class_name=UserSecurityResponse }
第二个参数是key的反序列化对象,这是一个kafka的标准的反序列化类 LongDeserializer
第三个参数是value的反序列化对象,反射创建时,需要读取pojo_class_name参数
订阅和消费消息:
在consumer对象创建好后,就可以从线程池中启动consumer了,订阅指定的topic,并poll消息,如果有拉取到消息,这将消息notify给监听者
consumer.subscribe(topics); ConsumerGroup.this.isRunning = true; while (true) { ConsumerRecords<K,V> records = null; try { processCommit(SyncMode.ASYNC); records = consumer.poll(isPolling ? Long.MAX_VALUE : 0); if(records != null && records.count() > 0) { listener.notify(records); } } catch(WakeupException wex) { LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex); } isPolling = true; }
以上是关于Kafka 消息的序列化与反序列化的主要内容,如果未能解决你的问题,请参考以下文章