kafka-msgpack-json: 将msgpack和protobuf转化成json
Posted 技术和生活小站
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-msgpack-json: 将msgpack和protobuf转化成json相关的知识,希望对你有一定的参考价值。
pic by awesome from Instagram
用kafka-console-consumer.sh消费kafka数据时,如果数据是msgpack或protobuf的二进制格式时,打印出来的是乱码,不方便查看。
如何开发一个程序,直接消费kafka的msgpack和protobuf格式的数据,然后转化成json格式输出到控制台呢?
一种思路是,直接调用KafkaConsumer API,Java程序开发,代价比较高。因为kafka-console-consumer.sh支持很多参数,包括—topic,—bootstrap-server,—zookeeper等,重新开发一套程序,要兼容这些参数,很困难。
另一种思路是,借用linux的管道,前面还是用kafka-console-consumer.sh消费,用管道(|)将流数据传给Java程序,Java程序只负责解析二进制数据即可。
具体做法是:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test | msgpack2json.sh
msgpack2json.sh只要简单调用java -jar msgpack2json.jar即可。
Java程序读取管道也很简单:
BufferedInputStream bis = new BufferedInputStream(System.in); byte[] buffer = new byte[4096]; int length; while ((length = bis.read(buffer)) != -1) { if (length <= 1) { continue; } byte[] data = new byte[length]; System.arraycopy(buffer, 0, data, 0, length); // handle data ...... }
这个方案存在问题。由于是二进制流,BufferedInputStream不知道要读多少个字节才是一条完整的kafka数据。缓存的buffer数组设置大一些,可以减少读到不完整记录的情况。kafka数据量小的时候,问题不大,但在kafka数据量大时,还是会经常出现读不到完整记录的情况。
如果是文本记录,用BufferedReader读取,换行符分隔记录,就不会出现这个问题。
所以管道读取不是很好的解决方案。
从kafka-console-consumer.sh这个脚本入手,底层调用的是ConsoleConsumer这个类。
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
研究ConsoleConsumer.scala的源码,发现打印到控制台的代码是:
class DefaultMessageFormatter extends MessageFormatter { ...... def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) { val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes) val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes) output.write(convertedBytes) output.write(separator) } ...... } }
只要将convertedBytes转化成json就达到目的了。
改造后的代码如下:
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) { val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes) val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes) var parser: Parser = new MsgPackParser if (ConsoleConsumer.protoDesc != null) { val protoDescBuf = Files.readAllBytes(Paths.get(ConsoleConsumer.protoDesc)) parser = new ProtobufParser(protoDescBuf) } try { output.write(parser.parse(convertedBytes).getBytes) output.write(separator) } catch { case e: Throwable => System.err.println("Unknown error when running consumer: ", e) System.exit(1) } }
默认按msgpack格式进行解析,如果脚本运行参数里有—proto参数,则按protobuf格式数据进行解析。
由于protobuf格式是需要描述文件进行解析的,是.proto结尾的文件,定义了每个字段的类型和字段的顺序。如果要动态解析protobuf数据,就需要在脚本参数里指定.proto的描述文件。
Google的protobuf API里,有DynamicMessage类型,可以用DynamicMessage.parseFrom(Descriptors.Descriptor, byte[])进行解析。byte[]可以传kafka的二进制数据,而Descriptors.Descriptor对象构造需要如下步骤。
1、生成descriptor文件。
protoc --descriptor_set_out=test.proto.desc test.proto
2、从descriptor文件生成Descriptors.Descriptor对象。
byte[] schemaDescBuf = Files.readAllBytes(Paths.get("test.proto.desc")); DynamicSchema schema = DynamicSchema.parseFrom(schemaDescBuf); Set<String> set = schema.getMessageTypes(); descriptor = schema.getMessageDescriptor(set.iterator().next());
这里引用了protobuf-dynamic包。
<dependency> <groupId>com.github.os72</groupId> <artifactId>protobuf-dynamic</artifactId> <version>1.0.0</version> </dependency>
注意,程序默认取的是proto文件里第1个定义的Message对象。如果proto文件里定义了几个Message对象,则程序很难知道要匹配哪个。建议一个proto文件只定义一个Message。
改造ConsumerConfig.scala程序后,参照kafka-console-consumer.sh生成脚本kafka-msgpack-json.sh。
最终调用方式为:
# msgpack kafka-msgpack-json.sh --bootstrap-server localhost:9092 --topic test # protobuf kafka-msgpack-json.sh --bootstrap-server localhost:9092 --topic test --proto log_message.proto
点击“阅读原文”获取程序源码。
以上是关于kafka-msgpack-json: 将msgpack和protobuf转化成json的主要内容,如果未能解决你的问题,请参考以下文章