kafka-msgpack-json: 将msgpack和protobuf转化成json

Posted 技术和生活小站

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-msgpack-json: 将msgpack和protobuf转化成json相关的知识,希望对你有一定的参考价值。

pic by awesome from Instagram


用kafka-console-consumer.sh消费kafka数据时,如果数据是msgpack或protobuf的二进制格式时,打印出来的是乱码,不方便查看。

如何开发一个程序,直接消费kafka的msgpack和protobuf格式的数据,然后转化成json格式输出到控制台呢?

一种思路是,直接调用KafkaConsumer API,Java程序开发,代价比较高。因为kafka-console-consumer.sh支持很多参数,包括—topic,—bootstrap-server,—zookeeper等,重新开发一套程序,要兼容这些参数,很困难。

另一种思路是,借用linux的管道,前面还是用kafka-console-consumer.sh消费,用管道(|)将流数据传给Java程序,Java程序只负责解析二进制数据即可。

具体做法是:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test | msgpack2json.sh

msgpack2json.sh只要简单调用java -jar msgpack2json.jar即可。

Java程序读取管道也很简单:

BufferedInputStream bis = new BufferedInputStream(System.in); byte[] buffer = new byte[4096]; int length; while ((length = bis.read(buffer)) != -1) {    if (length <= 1) {        continue;    }    byte[] data = new byte[length];    System.arraycopy(buffer, 0, data, 0, length);    // handle data    ...... }

这个方案存在问题。由于是二进制流,BufferedInputStream不知道要读多少个字节才是一条完整的kafka数据。缓存的buffer数组设置大一些,可以减少读到不完整记录的情况。kafka数据量小的时候,问题不大,但在kafka数据量大时,还是会经常出现读不到完整记录的情况。

如果是文本记录,用BufferedReader读取,换行符分隔记录,就不会出现这个问题。

所以管道读取不是很好的解决方案。

从kafka-console-consumer.sh这个脚本入手,底层调用的是ConsoleConsumer这个类。

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

研究ConsoleConsumer.scala的源码,发现打印到控制台的代码是:

class DefaultMessageFormatter extends MessageFormatter {    ......    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {        def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {          val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)          val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)          output.write(convertedBytes)          output.write(separator)        }        ......    } }

只要将convertedBytes转化成json就达到目的了。

改造后的代码如下:

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator: Array[Byte]) {      val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)      val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)      var parser: Parser = new MsgPackParser      if (ConsoleConsumer.protoDesc != null) {        val protoDescBuf = Files.readAllBytes(Paths.get(ConsoleConsumer.protoDesc))        parser = new ProtobufParser(protoDescBuf)      }      try {        output.write(parser.parse(convertedBytes).getBytes)        output.write(separator)      } catch {        case e: Throwable =>          System.err.println("Unknown error when running consumer: ", e)          System.exit(1)      }    }

默认按msgpack格式进行解析,如果脚本运行参数里有—proto参数,则按protobuf格式数据进行解析。

由于protobuf格式是需要描述文件进行解析的,是.proto结尾的文件,定义了每个字段的类型和字段的顺序。如果要动态解析protobuf数据,就需要在脚本参数里指定.proto的描述文件。

Google的protobuf API里,有DynamicMessage类型,可以用DynamicMessage.parseFrom(Descriptors.Descriptor, byte[])进行解析。byte[]可以传kafka的二进制数据,而Descriptors.Descriptor对象构造需要如下步骤。

1、生成descriptor文件。

protoc --descriptor_set_out=test.proto.desc test.proto

2、从descriptor文件生成Descriptors.Descriptor对象。

byte[] schemaDescBuf = Files.readAllBytes(Paths.get("test.proto.desc")); DynamicSchema schema = DynamicSchema.parseFrom(schemaDescBuf); Set<String> set = schema.getMessageTypes(); descriptor = schema.getMessageDescriptor(set.iterator().next());

这里引用了protobuf-dynamic包。

<dependency>    <groupId>com.github.os72</groupId>    <artifactId>protobuf-dynamic</artifactId>    <version>1.0.0</version> </dependency>

注意,程序默认取的是proto文件里第1个定义的Message对象。如果proto文件里定义了几个Message对象,则程序很难知道要匹配哪个。建议一个proto文件只定义一个Message。

改造ConsumerConfig.scala程序后,参照kafka-console-consumer.sh生成脚本kafka-msgpack-json.sh。

最终调用方式为:

# msgpack kafka-msgpack-json.sh --bootstrap-server localhost:9092 --topic test # protobuf kafka-msgpack-json.sh --bootstrap-server localhost:9092 --topic test --proto log_message.proto

点击“阅读原文”获取程序源码


以上是关于kafka-msgpack-json: 将msgpack和protobuf转化成json的主要内容,如果未能解决你的问题,请参考以下文章

将自己的博客园,打造成个人知乎

如何将thinkcmf导入eclipse

如何将Ios文件上传到

Javascript 将正则表达式 \\n 替换为 \n,将 \\t 替换为 \t,将 \\r 替换为 \r 等等

如何将视频文件转换格式

sh 一个将生成CA的脚本,将CA导入到钥匙串中,然后它将创建一个证书并与CA签名,然后将其导入到