通过在 PIG 中的同一块内计算的条件值在 FOREACH 块内进行过滤

Posted

技术标签:

【中文标题】通过在 PIG 中的同一块内计算的条件值在 FOREACH 块内进行过滤【英文标题】:Filtering inside a FOREACH block by a condition value calculated inside the same block in PIG 【发布时间】:2014-03-05 11:14:19 【问题描述】:

我有一个日志数据集,我需要过滤掉设备发生故障后的所有日志条目(Action = 2)。

在这个例子中:

EquipId, ScvId, Action, TimeStamp
Ag,01,1,14-01-01 0:00:01
Ag,01,1,14-01-02 0:00:01
Ag,01,2,14-01-03 0:00:01
Ag,01,1,14-01-04 0:00:01
Ag,01,1,14-01-05 0:00:01
Ag,01,2,14-01-06 0:00:01
Ag,01,1,14-01-07 0:00:01
Ra,01,1,14-01-01 0:00:01
Ra,01,1,14-01-02 0:00:01
Ra,01,1,14-01-03 0:00:01
Ra,01,2,14-01-04 0:00:01
Fe,01,2,14-01-03 0:00:01
Fe,01,1,14-01-03 0:00:02
Fe,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01
Lu,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01

预期的输出是

Ag,01,1,14-01-01 0:00:01
Ag,01,1,14-01-02 0:00:01
Ag,01,2,14-01-03 0:00:01
Ra,01,1,14-01-01 0:00:01
Ra,01,1,14-01-02 0:00:01
Ra,01,1,14-01-03 0:00:01
Ra,01,2,14-01-04 0:00:01
Fe,01,2,14-01-03 0:00:01
Lu,01,1,14-01-05 0:00:01
Lu,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01

我试图在这样的单个 FOREACH 块中对其进行编程:

rawData = LOAD './test.csv'  USING PigStorage(',') AS (equipId:chararray, svcId:chararray, action:chararray, date:chararray);

equipDataGrp = GROUP rawData BY equipId;

minFail = FOREACH equipDataGrp 

    actionFail = FILTER rawData BY action == '2';
    minFailDate = MIN(actionFail.date);
    prevActionsFail = FILTER rawData BY date <= minFailDate;


    GENERATE group as equipId, FLATTEN(prevActionsFail.date);

;

我收到以下错误:

2014-03-05 11:08:11,720 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1000: 
<line 36, column 28> Invalid field reference. Referenced field [date] does not exist in schema: .

如果我将日期硬编码为:

minFail = FOREACH equipDataGrp 

    actionFail = FILTER rawData BY action == '2';
    minFailDate = MIN(actionFail.date);
    prevActionsFail = FILTER rawData BY date == '14-01-03 0:00:01';


    GENERATE group as equipId, FLATTEN(prevActionsFail.date);

;

我得到回应:

(Ag,14-01-03 0:00:01)
(Fe,14-01-03 0:00:01)
(Ra,14-01-03 0:00:01)

有什么建议吗?

提前致谢!

【问题讨论】:

【参考方案1】:

您需要计算故障时间并将其分配给设备 ID 的所有记录。然后您可以过滤时间戳晚于该时间的记录:

rawData = LOAD './test.csv'  USING PigStorage(',') AS (equipId:chararray, svcId:chararray, action:chararray, date:chararray);

equipDataGrp = GROUP rawData BY equipId;

/* Expand out into all records again, appending the earliest failure time */
minFail = FOREACH equipDataGrp 
    actionFail = FILTER rawData BY action == '2';
    GENERATE FLATTEN(rawData), MIN(actionFail.date) AS failTime;
;

notYetFailed = FOREACH (FILTER minFail BY date <= failTime) GENERATE equipId .. date;

【讨论】:

谢谢!我们是这样实现的

以上是关于通过在 PIG 中的同一块内计算的条件值在 FOREACH 块内进行过滤的主要内容,如果未能解决你的问题,请参考以下文章

通过过滤度量值在 MDX 中定义计算成员

Pandas HTML 输出条件格式 - 如果值在范围内,则突出显示单元格

在 Pig 中使用多个条件过滤列

根据 APACHE PIG 中的条件加载数据

PIG:使用条件

如何计算并计算 PIG 中的总平均值