如何在 python apache Beam 的窗口中订购元素?

Posted

技术标签:

【中文标题】如何在 python apache Beam 的窗口中订购元素?【英文标题】:How can I order elements in a window in python apache beam? 【发布时间】:2017-02-07 05:09:06 【问题描述】:

我注意到 java apache beam 有类 groupby.sortbytimestamp,python 是否实现了该功能?如果不是,那么在窗口中对元素进行排序的方法是什么?我想我可以在 DoFn 中对整个窗口进行排序,但我想知道是否有更好的方法。

【问题讨论】:

你在哪里找到那个类?我认为它不再存在了:github.com/apache/beam/… 【参考方案1】:

Beam 中目前没有内置的值排序(在 Python 或 Java 中)。现在,最好的选择是您自己在 DoFn 中对值进行排序,就像您提到的那样。

【讨论】:

【参考方案2】:

这是一个使用 CombineFn 的解决方案。它具有使用 TreeSet 删除重复数据的额外好处。您还应该确保您的窗口数据足够小以适合单个工作人员的内存。

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> 
@Override
public TreeSet<MarketData> createAccumulator() 
    return new TreeSet<>(Comparator
            .comparingLong(MarketData::getEventTime)
            .thenComparing(MarketData::getOrderbookType));


@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) 
    accum.add(input);
    return accum;


@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) 

    TreeSet<MarketData> merged = createAccumulator();
    for (TreeSet<MarketData> accum : accums) 
        merged.addAll(accum);
    
    return merged;


@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) 
    return Lists.newArrayList(accum.iterator());

【讨论】:

以上是关于如何在 python apache Beam 的窗口中订购元素?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 PCollection Apache Beam Python 创建 N 个元素组

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?

如何运行 Apache Beam 集成测试?

使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

Python 上的 Apache Beam 将 beam.Map 调用相乘