如何在 Spark 中将双行与阈值匹配?
Posted
技术标签:
【中文标题】如何在 Spark 中将双行与阈值匹配?【英文标题】:How can I match double rows with a threshold in Spark? 【发布时间】:2021-02-16 09:37:26 【问题描述】:我有一个非常简单的数据框:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|1 |-3.47 |
|2 |3.47 |
|3 |3.47 |
|4 |2.02 |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我想匹配给定阈值(例如 0.5)相互抵消的行。 所以在这种情况下,匹配第 0 行和第 1 行、第 4 和第 5 行,并返回第 2 和第 3 行。有几种解决方案,返回第 0 和第 2 行也可以。
一般的想法是它们应该被 2 比 2 匹配并返回剩余部分。如果每行都有匹配项,则它不应该返回任何内容,并且应该返回所有无法以这种方式配对的行。
知道怎么做吗?
预期结果:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|2 |3.47 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我一直在考虑使用UserDefinedAggregateFunction
,但我不确定它是否足够。特别是因为我认为每组行只能返回一个值。
【问题讨论】:
【参考方案1】:我选择了 UDF。用 Java 编写 UDF 实在是太复杂了……
如果有人能找到简化这种混乱的方法,请发帖或发表评论。
private UDF1<WrappedArray<Row>, Row[]> matchData()
return (data) ->
List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
Set<Data> matched = new HashSet<>();
for (Data element : dataList)
if (matched.contains(element)) continue;
dataList.stream().filter(e -> !matched.contains(e) && e != element)
.filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
&& Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
.min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
.ifPresent(e ->
matched.add(e);
matched.add(element);
);
if (matched.size() != dataList.size())
return dataList.stream().map(Data::toRow).toArray(Row[]::new);
else
return new Row[0];
;
使用 Data 类(使用 Lombok):
@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data
private String name;
private Double amount;
public static Data fromRow(Row r)
return new Data(
r.getString(r.fieldIndex("name")),
r.getDouble(r.fieldIndex("amount")));
public Row toRow()
return RowFactory.create(name, amount);
我会退回整套以防万一它不起作用,这实际上是我需要的。
【讨论】:
而且我什至没有发布架构定义和其中的数据集调用链部分。以上是关于如何在 Spark 中将双行与阈值匹配?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Submit 中将 s3a 与 Apache spark 2.2(hadoop 2.8) 一起使用?