如何在 Apache Beam 中写入多个文件?

Posted

技术标签:

【中文标题】如何在 Apache Beam 中写入多个文件?【英文标题】:How do I write to multiple files in Apache Beam? 【发布时间】:2017-09-03 14:26:44 【问题描述】:

让我简化一下我的情况。我正在使用 Apache Beam 0.6.0。我的最终处理结果是PCollection<KV<String, String>>。我想将值写入与其键对应的不同文件。

例如,假设结果包括

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

那我想写value1value3value4key1.txt,写value4key2.txt

就我而言:

密钥集是在管道运行时确定的,而不是在构建管道时确定的。 键集可能很小,但每个键对应的值的数量可能非常非常大。

有什么想法吗?

【问题讨论】:

侧输出 - beam.apache.org/documentation/programming-guide/… @GrahamPolley 我认为侧面输出是在图形构建时决定的。购买我的案例需要管道运行时间。 :-( 是的,没错。 Beam 尚不支持动态侧输出(或输入)。 @GrahamPolley 是的,我知道。 issues.apache.org/jira/browse/BEAM-92 仍未解决。所以我想知道是否有一些解决方法。 我不相信有解决办法。 【参考方案1】:

很方便,我前几天写了一个这个案例的样本。

这个例子是dataflow 1.x风格

基本上,您可以按每个键进行分组,然后您可以使用连接到云存储的自定义转换来完成此操作。需要注意的是,每个文件的行列表不应该很大(它必须适合单个实例的内存,但考虑到您可以运行高内存实例,这个限制相当高)。

    ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
                .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
        readyToWrite.apply(
                new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

然后进行大部分工作的转换是:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> 

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) 
        this.bucketName = bucketName;
        this.pathCreator = pathCreator;
    

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) 

        return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() 

                @Override
                public void processElement(
                    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                    throws Exception 
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                        .setContentType(MimeTypes.TEXT)
                        .build();
                    LOG.info("blob writing to: ", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                        toWrite.getBytes(StandardCharsets.UTF_8));
                
            ));
    

【讨论】:

理想情况下,接收器会提供某种方式来做到这一点,但目前还没有。【参考方案2】:

只需在 ParDo 函数中编写一个循环! 更多细节 - 我今天遇到了同样的情况,唯一的情况是 key=image_label 和 value=image_tf_record。所以就像你问的那样,我正在尝试创建单独的 TFRecord 文件,每个类一个,每个记录文件包含许多图像。但是,当每个键的值非常高时,例如您的场景,不确定是否可能存在内存问题: (我的代码也是用 Python 编写的)

class WriteToSeparateTFRecordFiles(beam.DoFn):

def __init__(self, outdir):
    self.outdir = outdir

def process(self, element):
    l, image_list = element
    writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
    for example in image_list:
        writer.write(example.SerializeToString())
    writer.close()

然后在您的管道中,在您获得键值对的阶段之后添加这两行:

   (p
    | 'GroupByLabelId' >> beam.GroupByKey()
    | 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
    )

【讨论】:

Java SDK 没有TFRecord :( 但是谢谢!【参考方案3】:

在 Apache Beam 2.2 Java SDK 中,TextIOAvroIO 分别使用 TextIOAvroIO.write().to(DynamicDestinations) 原生支持此功能。参见例如this method.

更新(2018 年):更喜欢将 FileIO.writeDynamic()TextIO.sink()AvroIO.sink() 一起使用。

【讨论】:

也适用于 BigQuery:beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/…【参考方案4】:

只需在 ParDo 类中写下以下几行:

from apache_beam.io import filesystems

eventCSVFileWriter = filesystems.FileSystems.create(gcsFileName)
for record in list(Records):
    eventCSVFileWriter.write(record)

如果您想要完整的代码,我也可以为您提供帮助。

【讨论】:

【参考方案5】:

您可以为此使用 FileIO.writeDinamic()

PCollection<KV<String,String>> readfile= (something you read..);

readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("somefolder")
    .withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));

p.run();

【讨论】:

以上是关于如何在 Apache Beam 中写入多个文件?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 处理文件

如何使用 Apache Beam Python 将输出写入动态路径

Apache Beam 使用多个表时的写入次数

如何从 GCP 存储桶中读取 Apache Beam 中的多个文件

Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery

在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息