如何在 Spark 中将双行与阈值匹配?

Posted

技术标签:

【中文标题】如何在 Spark 中将双行与阈值匹配?【英文标题】:How can I match double rows with a threshold in Spark? 【发布时间】:2021-02-16 09:37:26 【问题描述】:

我有一个非常简单的数据框:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|1 |-3.47 |
|2 |3.47  |
|3 |3.47  |
|4 |2.02  |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我想匹配给定阈值(例如 0.5)相互抵消的行。 所以在这种情况下,匹配第 0 行和第 1 行、第 4 和第 5 行,并返回第 2 和第 3 行。有几种解决方案,返回第 0 和第 2 行也可以。

一般的想法是它们应该被 2 比 2 匹配并返回剩余部分。如果每行都有匹配项,则它不应该返回任何内容,并且应该返回所有无法以这种方式配对的行。

知道怎么做吗?

预期结果:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|2 |3.47  |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我一直在考虑使用UserDefinedAggregateFunction,但我不确定它是否足够。特别是因为我认为每组行只能返回一个值。

【问题讨论】:

【参考方案1】:

我选择了 UDF。用 Java 编写 UDF 实在是太复杂了……

如果有人能找到简化这种混乱的方法,请发帖或发表评论。

private UDF1<WrappedArray<Row>, Row[]> matchData() 
    return (data) -> 
        List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
        Set<Data> matched = new HashSet<>();

        for (Data element : dataList) 
            if (matched.contains(element)) continue;

            dataList.stream().filter(e -> !matched.contains(e) && e != element)
                    .filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
                            && Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
                    .min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
                    .ifPresent(e -> 
                        matched.add(e);
                        matched.add(element);
                    );
        


        if (matched.size() != dataList.size()) 
            return dataList.stream().map(Data::toRow).toArray(Row[]::new);
         else 
            return new Row[0];
        
    ;

使用 Data 类(使用 Lombok):

@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data 
    private String name;
    private Double amount;

    public static Data fromRow(Row r) 
        return new Data(
                r.getString(r.fieldIndex("name")),
                r.getDouble(r.fieldIndex("amount")));
    

    public Row toRow() 
        return RowFactory.create(name, amount);
    

我会退回整套以防万一它不起作用,这实际上是我需要的。

【讨论】:

而且我什至没有发布架构定义和其中的数据集调用链部分。

以上是关于如何在 Spark 中将双行与阈值匹配?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spark 决策树调整分类阈值

如何在 Spark Submit 中将 s3a 与 Apache spark 2.2(hadoop 2.8) 一起使用?

如何在oracle查询中将字符与#匹配

如何在 java 中将“i”与土耳其语 i 匹配?

spark ml 2.0 - 朴素贝叶斯 - 如何确定每个类的阈值

如何在 Spark 中将两个 RDD[string] 合并在一起?