Apache Flink writeAsCsv() 方法写入对象元组

Posted

技术标签:

【中文标题】Apache Flink writeAsCsv() 方法写入对象元组【英文标题】:Apache Flink writeAsCsv() method to write a tuple of objects 【发布时间】:2017-12-11 23:02:09 【问题描述】:

我正在关注 Apache Flink 教程来清理 TaxiRide 事件流。结果流被打印到控制台。现在我想将它写入 csv 文件。

        // configure event-time processing
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // get the taxi ride data stream
        DataStream<TaxiRide> rides = env.addSource(
                new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));

        DataStream<TaxiRide> filteredRides = rides
                // filter out rides that do not start or stop in NYC
                .filter(new RideCleansing.NYCFilter());

        filteredRides.print();

我尝试了以下但收到错误:java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.

DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);

当我创建 DataSet&lt;Tuple1&lt;TaxiRide&gt;&gt; rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1); 时,它会导致编译器错误。

我应该怎么做才能将生成的 TaxiRide 对象流写入 csv 文件?

【问题讨论】:

【参考方案1】:

DataStreamDataSet 属于不同的 API,不能混用。因此,编译错误。

错误消息“writeAsCsv() 方法只能用于元组的数据流。”意味着,您必须将 DataStream&lt;TaxiRide&gt; 对象转换为元组的 DataStream 才能将其写入 CSV 文件。 这可以通过一个简单的MapFunction 来完成:

DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides
   .map(new TupleConverter());

TupleConverter 被定义为

class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> 

  public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride) 
     return Tuple9.of(ride.rideId, ride.isStart, ...);
  

获得DataStream rideTuples 后,您可以将其写入CSV 文件。

【讨论】:

以上是关于Apache Flink writeAsCsv() 方法写入对象元组的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink 入门,了解 Apache Flink

Apache Flink 入门,了解 Apache Flink

译文《Apache Flink官方文档》 Apache Flink介绍

Flink从入门到精通100篇(二十一)-Apache Flink 与 Apache Hive 的集成

apache flink入门一

Apache Flink 欺诈交易检测