从 POJO 到 Avro Record 的通用转换
Posted
技术标签:
【中文标题】从 POJO 到 Avro Record 的通用转换【英文标题】:Generic conversion from POJO to Avro Record 【发布时间】:2015-09-21 08:27:05 【问题描述】:我正在寻找一种以通用方式将 POJO 转换为 avro 对象的方法。实现应该对 POJO 类的任何更改都是健壮的。我已经实现了,但明确填写了 avro 记录(参见下面的示例)。
有没有办法摆脱硬编码的字段名称,只从对象中填充 avro 记录?反射是唯一的方法,还是 avro 开箱即用地提供了这个功能?
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.reflect.ReflectData;
public class PojoToAvroExample
static class PojoParent
public final Map<String, String> aMap = new HashMap<String, String>();
public final Map<String, Integer> anotherMap = new HashMap<String, Integer>();
static class Pojo extends PojoParent
public String uid;
public Date eventTime;
static Pojo createPojo()
Pojo foo = new Pojo();
foo.uid = "123";
foo.eventTime = new Date();
foo.aMap.put("key", "val");
foo.anotherMap.put("key", 42);
return foo;
public static void main(String[] args)
// extract the avro schema corresponding to Pojo class
Schema schema = ReflectData.get().getSchema(Pojo.class);
System.out.println("extracted avro schema: " + schema);
// create avro record corresponding to schema
Record avroRecord = new Record(schema);
System.out.println("corresponding empty avro record: " + avroRecord);
Pojo foo = createPojo();
// TODO: to be replaced by generic variant:
// something like avroRecord.importValuesFrom(foo);
avroRecord.put("uid", foo.uid);
avroRecord.put("eventTime", foo.eventTime);
avroRecord.put("aMap", foo.aMap);
avroRecord.put("anotherMap", foo.anotherMap);
System.out.println("expected avro record: " + avroRecord);
【问题讨论】:
为什么不用Avro's ReflectDatumWriter来序列化POJO呢? 我在 hadoop 上下文中使用 avro。对于序列化,我想使用 AvroParquetOutputFormat 一个低效的方法是ReflectDatumWriter write a POJO to bytes then GenericDatumReader reads the bytes to GenericRecord。 【参考方案1】:你在用 Spring 吗?
我使用 Spring 功能为此构建了一个映射器。但也可以通过原始反射工具构建这样的映射器:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;
public class GenericRecordMapper
public static GenericData.Record mapObjectToRecord(Object object)
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
final GenericData.Record record = new GenericData.Record(schema);
schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
return record;
public static <T> T mapRecordToObject(GenericData.Record record, T object)
Assert.notNull(record, "record must not be null");
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
return object;
使用此映射器,您可以生成一个可以轻松序列化为 avro 的 GenericData.Record。当您反序列化 Avro ByteArray 时,您可以使用它从反序列化记录重建 POJO:
序列化
byte[] serialized = avroSerializer.serialize("topic", GenericRecordMapper.mapObjectToRecord(yourPojo));
反序列化
GenericData.Record deserialized = (GenericData.Record) avroDeserializer.deserialize("topic", serialized);
YourPojo yourPojo = GenericRecordMapper.mapRecordToObject(deserialized, new YourPojo());
【讨论】:
不错,但似乎不能正确处理列表、集合,例如List这是通用的转换方式
public static <V> byte[] toBytesGeneric(final V v, final Class<V> cls)
final ByteArrayOutputStream bout = new ByteArrayOutputStream();
final Schema schema = ReflectData.get().getSchema(cls);
final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null);
try
writer.write(v, binEncoder);
binEncoder.flush();
catch (final Exception e)
throw new RuntimeException(e);
return bout.toByteArray();
public static void main(String[] args)
PojoClass pojoObject = new PojoClass();
toBytesGeneric(pojoObject, PojoClass.class);
【讨论】:
【参考方案3】:使用jackson/avro,很容易将pojo转换为byte[],类似于jackson/json:
byte[] avroData = avroMapper.writer(schema).writeValueAsBytes(pojo);
附言 jackson 不仅处理 JSON,还处理 XML/Avro/Protobuf/YAML 等,具有非常相似的类和 API。
【讨论】:
【参考方案4】:除了我对@TranceMaster 的评论之外,下面的修改版本适用于原始类型和 Java 集:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;
public class GenericRecordMapper
public static GenericData.Record mapObjectToRecord(Object object)
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
System.out.println(schema);
final GenericData.Record record = new GenericData.Record(schema);
schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
return record;
public static <T> T mapRecordToObject(GenericData.Record record, T object)
Assert.notNull(record, "record must not be null");
Assert.notNull(object, "object must not be null");
final Schema schema = ReflectData.get().getSchema(object.getClass());
Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
record
.getSchema()
.getFields()
.forEach(field ->
PropertyAccessorFactory
.forDirectFieldAccess(object)
.setPropertyValue(field.name(), record.get(field.name()))
);
return object;
【讨论】:
【参考方案5】:我自己正是需要这样的东西。您需要的库位于 avro JAR 文件中,但奇怪的是,似乎没有办法从 avro-tools 命令行调用它。
调用它为:java GenerateSchemaFromPOJO com.example.pojo.Person Person.java
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import org.apache.avro.Schema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import com.fasterxml.jackson.dataformat.avro.schema.VisitorFormatWrapperImpl;
public class GenerateSchemaFromPOJO
public static void main(String[] args)
String className = null;
String outputFile = null;
Writer outputWriter = null;
try
if(args.length != 2)
System.out.println("Usage: java " + GenerateSchemaFromPOJO.class.getCanonicalName() + " classname output-schema-file.json");
System.exit(1);
className = args[0];
outputFile = args[1];
Class<?> clazz = Class.forName(className);
AvroFactory avroFactory = new AvroFactory();
ObjectMapper mapper = new ObjectMapper(avroFactory);
AvroSchemaGenerator gen = new AvroSchemaGenerator();
mapper.acceptJsonFormatVisitor(clazz, gen);
AvroSchema schemaWrapper = gen.getGeneratedSchema();
Schema avroSchema = schemaWrapper.getAvroSchema();
String asJson = avroSchema.toString(true);
outputWriter = new FileWriter(outputFile);
outputWriter.write(asJson);
catch (Exception ex)
System.err.println("caught " + ex);
ex.printStackTrace();
System.exit(1);
finally
if(outputWriter != null)
try
outputWriter.close();
catch (IOException e)
System.err.println("Caught " + e + " while trying to close outputWriter to " + outputFile);;
e.printStackTrace();
【讨论】:
我从您的回答中了解到,您的代码会为给定的clazz
生成 avro 架构。这不是我在问题中所要求的。我在ReflectData.get().getSchema(Pojo.class);
中做同样的事情。我正在寻找一种用通用变体替换 avroRecord.put(..., ...);
的方法以上是关于从 POJO 到 Avro Record 的通用转换的主要内容,如果未能解决你的问题,请参考以下文章
Avro Schema Evolution with GenericData.Record - Mapreduce 过程
如何将 Avro 的 GenericData.Record 的 RDD 转换为 DataFrame?
使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record