我可以获得使用 avro kafka 消息的示例代码吗?
Posted
技术标签:
【中文标题】我可以获得使用 avro kafka 消息的示例代码吗?【英文标题】:Can I get example code to consume avro kafka message? 【发布时间】:2016-04-15 17:53:30 【问题描述】:我刚刚设置了 Datatorrent RTS (Apache Apex) 平台并运行了 pi 演示。 我想使用来自 kafka 的“avro”消息,然后将数据聚合并存储到 hdfs 中。 我可以获得这个或kafka的示例代码吗?
【问题讨论】:
【参考方案1】:这是一个完整的工作应用程序的代码,它使用新的 Kafka 输入运算符和 Apex Malhar 的文件输出运算符。它将字节数组转换为字符串,并使用有限大小(本例中为 1K)的滚动文件将它们写入 HDFS;在文件大小达到限制之前,它将有一个带有.tmp
扩展名的临时名称。您可以按照 DevT 在https://***.com/a/36666388 中的建议在这两者之间插入额外的运算符):
package com.example.myapexapp;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
@Override
public void populateDAG(DAG dag, Configuration conf)
KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
in.setInitialPartitionCount(1);
in.setTopics("test");
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
//in.setClusters("localhost:2181");
in.setClusters("localhost:9092"); // NOTE: need broker address, not zookeeper
LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
out.setFilePath("/tmp/FromKafka");
out.setFileName("test");
out.setMaxLength(1024); // max size of rolling output file
// create stream connecting input adapter to output adapter
dag.addStream("data", in.outputPort, out.input);
/**
* Converts each tuple to a string and writes it as a new line to the output file
*/
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
private String fileName;
@Override
public byte[] getBytesForTuple(byte[] t) return (new String(t, CS) + NL).getBytes(CS);
@Override
protected String getFileName(byte[] tuple) return fileName;
public String getFileName() return fileName;
public void setFileName(final String v) fileName = v;
【讨论】:
【参考方案2】:在高层次上,您的应用程序代码类似于,
KafkaSinglePortStringInputOperator -> AvroToPojo -> 维度聚合器 -> AbstractFileOutputOperator 的实现
KafkaSinglePortStringInputOperator - 如果您正在使用其他数据类型,您可以使用 KafkaSinglePortByteArrayInputOperator 或编写自定义实现。
AvroToPojo - https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java.
此操作符将 GenericRecord 转换为给定 POJO 的用户。用户需要提供应该发出的 POJO 类,否则使用反射。目前这用于从容器文件中读取 GenericRecords,仅支持原始类型。用于读取从 Kafka,您可以按照类似的方式对您的操作员进行建模,并添加一个 Schema 对象来解析传入的记录。在 processTuple 方法中类似下面的东西应该可以工作, 架构模式 = 新的 Schema.Parser().parse()); GenericDatumReader reader = new GenericDatumReader(schema);
维度聚合器 - 您可以选择此处给出的聚合器之一 - https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensions 或按照相同的方式编写自定义聚合器。
FileWriter - 来自上面帖子中的示例。
【讨论】:
以上是关于我可以获得使用 avro kafka 消息的示例代码吗?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码
Spark:使用 Spark Scala 从 Kafka 读取 Avro 消息