Pig 生成一个键更改列 - 将先前记录与当前记录进行比较,但列不同

Posted

技术标签:

【中文标题】Pig 生成一个键更改列 - 将先前记录与当前记录进行比较,但列不同【英文标题】:Pig generate a key change column - comparing previous record with current record but different column 【发布时间】:2016-06-20 06:54:58 【问题描述】:

我的输入数据将采用以下格式。

   col1 col2   col3     effective date expiry date
1   Q1    A1  Value1     01/01           01/02
2   Q1    A1  Value1     01/02           01/03
3   Q1    A1  Value1     01/03           01/05
4   Q1    A1  Value2     01/05           01/06
5   Q1    A1  Value2     01/06           01/07
6   Q1    A1  Value2     01/07           01/08
7   Q1    A1  Value1     01/08           01/11
8   Q1    A1  Value1     01/11           12/31

我需要根据 col1、col2、col3 的值删除重复项,但不是所有重复项。直到 col3 的值更改为不同的值,记录 被视为重复。例如。在上述数据中,第 4 条记录中的值 1 变为值 2,因此在记录 1,2 和 3 中只应保留第 1 条。 在记录 4,5 和 6 中,仅应保留第 4 名。在记录 7 和 8 中,应该只保留 7 个。最后 2 列实际上是日期列(有效 和到期日)。 1,2 和 3 之类的重复项可能会出现多次(例如 1,2,3,4 和 5 可能具有相同的值),或者根本没有重复项。

我想到了两种方法,但不确定如何为其中任何一种编写代码。

    所以我正在考虑生成一个 keychange 列(1 或 0),将所有欺骗的值从 1 更改为 0,并且当键(col1、col2、col3 的组合) 更改,此 keychange 列的值应设置为 1。然后我可以过滤此列。 但是为此我需要编写一个 UDF(或者是否有任何具有类似功能的 UDF 可用?),因为这需要在传递给 udf 时输入按排序顺序, 是否可以将排序数据传递给udf?如果是这样,如何?这应该是什么样的UDF? 或者即使我写了一个mapreduce代码,我应该如何继续,我应该在mapper中发出记录并进行所有排序 并在减速器中生成列?请告诉我您的意见(mapreduce 编程新手,因此您的想法将对我的学习有很大帮助,谢谢!)。

    当我浏览“over”函数文档时,它只比较以前的记录和当前记录的同一列,如果我能以某种方式比较 col5(到期日期) 根据 col4(生效日期)按升序排序后的下一条记录的 col4(生效日期)的当前记录,我可以对 Col1、col2 和 Col3 进行分组 并删除那些生效日期与之前记录的到期日期相同的记录。但不确定如何使用 over 函数比较两个不同的列。 也请告诉我您对此的看法。

如果有其他更好的方法来解决这个问题,请告诉我。感谢您的宝贵时间!

【问题讨论】:

您可以做的是使用 col1、col2、col3 对行进行分组,然后将组的每一行传递给 udf。在 udf 中,您检查日期是否连续,然后做出决定。 您好 Vikas,感谢您的意见。我也在考虑同样的方法,我现在正在写那个 UDF。我有一个问题。我将不得不在输出包中写入多个元组。例如:C1、Q1 和 Value1 的 1 和 7。我只是不确定如何编写输出模式。目前我正在使用 Schema bagSchema = new Schema(new Schema.FieldSchema("filtered", intuple, DataType.TUPLE)); 但我不确定如何添加多个元组。你能建议吗?我指的是下面的链接。 [链接]spryinc.com/blog/guide-user-defined-functions-apache-pig[link] 也请您告诉我这两行是干什么用的? Schema bagSchema = new Schema(new Schema.FieldSchema("pair", tupleSchema, DataType.TUPLE)); bagSchema.setTwoLevelAccessRequired(true); Schema.FieldSchema bagFs = new Schema.FieldSchema("pairs", bagSchema, DataType.BAG);为什么我们需要在最后一行定义另一个包? 【参考方案1】:

假设 - 输入是 CSV 文件。

A = LOAD 'test.csv' using PigStorage(',');
B = GROUP A BY $0,$1,$2;
C = FOREACH B 
 D = LIMIT A 1;
 GENERATE D.$0,D.$1,D.$2,D.$3,D.$4;

DUMP C;

希望这会有所帮助。

【讨论】:

刚刚意识到这个不能满足您从第 7 条和第 8 条中选择第 7 条记录的要求。

以上是关于Pig 生成一个键更改列 - 将先前记录与当前记录进行比较,但列不同的主要内容,如果未能解决你的问题,请参考以下文章

MySQL触发器:在插入之前将记录与先前的记录进行比较

PIG 使用不同的负载变量生成数据

Sql Server 查找下一个最近更改的记录

计算与先前记录的差异

pig中的数据处理,标签分开

Laravel上一个和下一个记录