如何在 Pig 中转换参数?

Posted

技术标签:

【中文标题】如何在 Pig 中转换参数?【英文标题】:How do I transform a parameter in Pig? 【发布时间】:2014-09-10 15:08:29 【问题描述】:

我需要在 Pig 中处理一个数据集,该数据集每天午夜可用一次。因此,我有一个 Oozie 协调员负责安排日程,并在每天 00:00 产生一个工作流。 文件名遵循 URI 方案

hdfs://$dataRoot/input/raw$YEAR$MONTH$DAY$HOUR.avro

其中 $HOUR 始终为“00”。

数据集中的每个条目都包含一个 UNIX 时间戳,我想过滤掉那些时间戳在晚上 11:45 (23:45) 之前的条目。由于我需要在过去的数据集上运行,定义阈值的时间戳值需要根据当前处理的日期动态设置。例如,处理2013年12月12日的数据集,需要阈值1418337900。因此,设置阈值必须由协调器完成。

据我所知,在 EL 中不可能将格式化的日期转换为 UNIX 时间戳。我想出了一个非常hacky的解决方案: 协调器将阈值的日期和时间传递给启动 Pig 脚本的参数化实例的相应工作流。

coordinator.xml 的摘录:

<property>
    <name>threshold</name>
    <value>$coord:formatTime(coord:dateOffset(coord:nominalTime(), -15, 'MINUTE'), 'yyyyMMddHHmm')</value>
</property>

workflow.xml 的摘录:

<action name="foo">
    <pig>
        <job-tracker>$jobTracker</job-tracker>
        <name-node>$nameNode</name-node>
        <script>$applicationPath/flights.pig</script>
        <param>jobInput=$jobInput</param>
        <param>jobOutput=$jobOutput</param>
        <param>threshold=$threshold</param>
    </pig>
    <ok to="end"/>
    <error to="error"/>
</action>

Pig 脚本需要将此格式化的日期时间转换为 UNIX 时间戳。为此,我写了一个UDF:

public class UnixTime extends EvalFunc<Long> 

    private long myTimestamp = 0L;

    private static long convertDateTime(String dt, String format)
            throws IOException 
        DateFormat formatter;
        Date date = null;
        formatter = new SimpleDateFormat(format);
        try 
            date = formatter.parse(dt);
         catch (ParseException ex) 
            throw new IOException("Illegal Date: " + dt + " format: " + format);
        
        return date.getTime() / 1000L;
    

    public UnixTime(String dt, String format) throws IOException 
        myTimestamp = convertDateTime(dt, format);
    

    @Override
    public Long exec(Tuple input) throws IOException 
        return myTimestamp;
    


在 Pig 脚本中,创建了一个宏,使用协调器/工作流的输入初始化 UDF。然后,您可以过滤时间戳。

DEFINE THRESH mystuff.pig.UnixTime('$threshold', 'yyyyMMddHHmm');
d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray);
f = FILTER d BY d <= THRESH();
...

我遇到的问题引出了一个更普遍的问题,是否可以在 Pig 中转换输入参数并再次将其用作某种常量。 有没有更好的方法来解决这个问题,还是我的方法不必要地复杂?

编辑:TL;DR

经过更多搜索,我发现有人遇到同样的问题: http://grokbase.com/t/pig/user/125gszzxnx/survey-where-are-all-the-udfs-and-macros

感谢 Gaurav 在 piggybank 中推荐 UDF。 如果不使用 declare 和 shell 脚本,似乎没有高效的解决方案。

【问题讨论】:

您是否尝试过使用 UDF CustomFormatToISO (pig.apache.org/docs/r0.11.0/api/org/apache/pig/piggybank/…) 将给定日期(任何格式)转换为日期对象,然后使用 ISOToUnix (pig.apache.org/docs/r0.11.0/api/org/apache/pig/piggybank/…) 将其转换为纪元? 【参考方案1】:

您可以将 Pig 脚本放入 Python 脚本并传递值。

#!/usr/bin/python
import sys
import time

from org.apache.pig.scripting import Pig



P = Pig.compile("""d = LOAD '$jobInput' USING PigStorage(',') AS (time: long, value: chararray);
f = FILTER d BY d <= '$thresh';
""")

jobinput = whatever you defined
thresh = whatever you defined in the UDF
Q = P.bind('thresh':thresh,'jobinput':jobinput)
results = Q.runSingle()
if results.isSuccessful() == "FAILED":
         raise "Pig job failed"

【讨论】:

以上是关于如何在 Pig 中转换参数?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pig 中使用 MapReduce Native 传递命令行参数

如何在 Pig 中将字段转换为行?

如何在 PIG 中将 XLSX 文件转换为 CSV 文件?

如何在 PIG 中获取当前时间戳

如何在 Pig 中使用 Avro 数据

Pig - 如何将日期时间转换为 chararray