通过python UDF将文本文件导入pig

Posted

技术标签:

【中文标题】通过python UDF将文本文件导入pig【英文标题】:Import text files to pig through python UDF 【发布时间】:2015-05-27 03:57:14 【问题描述】:

我正在尝试在使用 python udf 时将文件加载到 pig,我尝试了两种方法:

• (myudf1, sample1.pig):尝试从 python 读取文件,该文件位于我的客户端服务器上。

• (myudf2, sample2.pig):首先从 hdfs 加载文件到 grunt shell,然后将其作为参数传递给 python udf。

myudf1.py

from __future__ import with_statement
def get_words(dir):
    stopwords=set()
    with open(dir) as f1:
        for line1 in f1:
            stopwords.update([line1.decode('ascii','ignore').split("\n")[0]])
    return stopwords

stopwords=get_words("/home/zhge/uwc/mappings/english_stop.txt")

@outputSchema("findit: int")
def findit(stp):
    stp=str(stp)
    if stp in stopwords:
        return 1
    else:
        return 0

sample1.pig:

REGISTER '/home/zhge/uwc/scripts/myudf1.py' USING jython as pyudf;
item_title = load '/user/zhge/data/item_title_sample/000000_0' USING PigStorage(',')  AS (title:chararray);

T = limit item_title 1;
S = FOREACH T GENERATE pyudf.findit(title);
DUMP S

我得到:IOError: (2, 'No such file or directory', '/home/zhge/uwc/mappings/english_stop.txt')

对于解决方案 2:

myudf2:

def get_wordlists(wordbag):
    stopwords=set()
    for t in wordbag:
        stopwords.update(t.decode('ascii','ignore'))
    return stopwords


@outputSchema("findit: int")
def findit(stopwordbag, stp):
    stopwords=get_wordlists(stopwordbag)
    stp=str(stp)
    if stp in stopwords:
        return 1
    else:
        return 0

Sample2.pig

REGISTER '/home/zhge/uwc/scripts/myudf2.py' USING jython as pyudf;

stops = load '/user/zhge/uwc/mappings/stopwords.txt' AS (stop_w:chararray);
-- this step works fine and i can see the "stops" obejct is loaded to pig 
item_title = load '/user/zhge/data/item_title_sample/000000_0' USING PigStorage(',')  AS (title:chararray);
T = limit item_title 1;
S = FOREACH T GENERATE pyudf.findit(stops.stop_w, title);
DUMP S;

然后我得到: ERROR org.apache.pig.tools.grunt.Grunt -ERROR 1066: Unable to open iterator for alias S. Backend error : Scalar 在输出中有不止一行。第一个:(a),第二个:(作为

【问题讨论】:

对于在寻找ERROR 1066: Unable to open iterator for alias 时发现此帖子的人,这里是generic solution。 【参考方案1】:

您的第二个示例应该可以工作。虽然你 LIMITed 错误的表达 - 它应该在 stops 关系上。因此应该是:

stops = LOAD '/user/zhge/uwc/mappings/stopwords.txt' AS (stop_w:chararray);

item_title = LOAD '/user/zhge/data/item_title_sample/000000_0' USING PigStorage(',') AS (title:chararray);
T = LIMIT stops 1;
S = FOREACH item_title GENERATE pyudf.findit(T.stop_w, title);

但是,由于您似乎需要先处理所有停用词,这还不够。您需要执行GROUP ALL,然后将结果传递给您的get_wordlist 函数:

stops = LOAD '/user/zhge/uwc/mappings/stopwords.txt' AS (stop_w:chararray);

item_title = LOAD '/user/zhge/data/item_title_sample/000000_0' USING PigStorage(',') AS (title:chararray);
T = FOREACH (GROUP stops ALL) GENERATE pyudf.get_wordlists(stops) AS ready;
S = FOREACH item_title GENERATE pyudf.findit(T.ready, title);

您必须更新您的 UDF 以接受字典列表才能使此方法起作用。

【讨论】:

以上是关于通过python UDF将文本文件导入pig的主要内容,如果未能解决你的问题,请参考以下文章

Apache Pig - 如何维护一个分布式查找表以供我的 python UDF 访问?

在 PIg 脚本中对 Avro 文件使用 UDF

PIG UDF 错误 - 可以使用导入解决

在 Pig 中只执行一次 UDF

使用 Pig 通过 Java 运行字符串

PIG UDF 加载 .gz 文件失败