Apache Flink writeAsCsv() 方法写入对象元组
Posted
技术标签:
【中文标题】Apache Flink writeAsCsv() 方法写入对象元组【英文标题】:Apache Flink writeAsCsv() method to write a tuple of objects 【发布时间】:2017-12-11 23:02:09 【问题描述】:我正在关注 Apache Flink 教程来清理 TaxiRide 事件流。结果流被打印到控制台。现在我想将它写入 csv 文件。
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new RideCleansing.NYCFilter());
filteredRides.print();
我尝试了以下但收到错误:java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.
DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);
当我创建 DataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);
时,它会导致编译器错误。
我应该怎么做才能将生成的 TaxiRide 对象流写入 csv 文件?
【问题讨论】:
【参考方案1】:DataStream
和 DataSet
属于不同的 API,不能混用。因此,编译错误。
错误消息“writeAsCsv() 方法只能用于元组的数据流。”意味着,您必须将 DataStream<TaxiRide>
对象转换为元组的 DataStream
才能将其写入 CSV 文件。
这可以通过一个简单的MapFunction
来完成:
DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides
.map(new TupleConverter());
TupleConverter
被定义为
class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>>
public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride)
return Tuple9.of(ride.rideId, ride.isStart, ...);
获得DataStream
rideTuples
后,您可以将其写入CSV 文件。
【讨论】:
以上是关于Apache Flink writeAsCsv() 方法写入对象元组的主要内容,如果未能解决你的问题,请参考以下文章
Apache Flink 入门,了解 Apache Flink
Apache Flink 入门,了解 Apache Flink
译文《Apache Flink官方文档》 Apache Flink介绍