发送字节数组到storm kafka bolt

Posted

技术标签:

【中文标题】发送字节数组到storm kafka bolt【英文标题】:Send byte array to storm kafka bolt 【发布时间】:2015-05-15 02:13:14 【问题描述】:

我写了一个风暴拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。

这就是我设置螺栓的方式:

  builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
            .fieldsGrouping(BOLT1, new Fields("key"));

这就是我转换为字节数组的方式

Schema schema = avroObject.getSchema();

        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(ping, encoder);
        encoder.flush();
        byte[] message = out.toByteArray();
        String key = new String(message, "UTF-8");

当我以以下方式发出元组时,我在 kafka 主题中看不到任何内容(向 kafka 发送字节流):

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

但是,如果我将字节数组转换为字符串,然后再转换为 kafka 主题,它可以工作:

如下所示:

 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
            .fieldsGrouping(BOLT1, new Fields("key"));

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

我做错了什么?如何使用storm kafka bolt将字节流发送到kafka topic?

【问题讨论】:

请出示您的 kafka 制作人。 我使用的是storm提供的Kafka bolt。参见 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt()) .fieldsGrouping(BOLT1, new Fields("key"));在上面的代码中 您正在将字节数组转换为 java 字符串以生成密钥,您可能会丢失数据,因为 java 字符串不是 C 字符串。您是否检查过您的键值是否正确?因为如果不是,您的 hashMD5 将是错误的。这可能是它不起作用的原因吗? 那么如何在java中将字节数组转换为字符串呢? 【参考方案1】:

您的问题是因为您的 MD5 哈希不正确:

你说如果你将你的 bytearray 转换为 java String 它可以工作:这是因为 MD5 的值根据 String 是正确的。

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

如您所见,MD5 是根据字符串参数计算的,您发送与 MD5 对应的字符串:一切都很好!

但是如果你发送一个字节数组,你需要在一个字节数组上计算MD5,结果它将是一个字节数组,而不是一个字符串。你的代码:

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

不正确,因为 MD5 不对应于消息,而是对应于 UTF-8 中消息的转换值作为有损字符串(见下文)。

这里是另一个关于 SO 的问题的链接,以字节数组格式正确计算 MD5:

How can I generate an MD5 hash?

这是因为在 Java 中将 bytearray 转换为 String 是有损的(与 C 相反),并且您会在此过程中丢失数据,因为某些字节与 Java 编码中的 char 不对应(您的数据中显然有其中一些) .

所以你的 KafkaBolt 应该是

KafkaBolt<byte[], byte[]>

我不知道在 kafka Storm 中发送一个 bytearray MD5 和你的 bytearray 是否足够。如果不是,则必须在 bytearray 和 java String 之间使用无损编码,例如 BASE64:

Base64 Encoding in Java

您必须将字节数组转换为 base64 字符串,使用

KafkaBolt<String, String>

然后像往常一样发送数据

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));

这也意味着当您从 kafka 获取数据时,它将是 base64 中的字符串,您必须对其进行解码才能取回字节数组。

希望对您有所帮助。

【讨论】:

您还需要确保配置设置为“serializer.class”、“kafka.serializer.DefaultEncoder” for KafkaBolt.KAFKA_BROKER_PROPERTIES 是Kafka中的默认值,但在其中设置为StringEncoder Storm-kafka 项目中的 KafkaBolt 示例!

以上是关于发送字节数组到storm kafka bolt的主要内容,如果未能解决你的问题,请参考以下文章

巧妙拆分bolt提升Storm集群吞吐量 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈

Kafka传输文件(字节数组)

Kafka传输文件(字节数组)

storm 整合 kafka之保存MySQL数据库

storm+kafka:WordCount程序

如何将两个不同 Spout 的输出发送到同一个 Bolt?