apache pig Java UDF - 更改属性中的值似乎并没有坚持

Posted

技术标签:

【中文标题】apache pig Java UDF - 更改属性中的值似乎并没有坚持【英文标题】:apache pig Java UDF - changing values in attributes doesn't seem to stick 【发布时间】:2012-04-04 22:52:56 【问题描述】:

我正在尝试编写一个 Java UDF,它将使用 Java UDF 对包中的元组进行排名。 元组有一个作为排名标准的值列和一个最初设置为 0 的排名列。 元组根据值列进行排序。 所有的元组都放在一个袋子里,而那个袋子又放在一个新的元组中,这个新元组会传递给 UDF。

但是,UDF 正在修改排名列 - 一旦方法退出,值都再次变为 0。我不确定如何将值设置为“Stick”。

任何帮助将不胜感激。

这是我的java类

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<String>
    @Override
    public String exec(Tuple tuple) throws IOException 
        if (tuple == null || tuple.size() == 0) 
            return null;
        


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) 

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0)    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
             else 
                if(d == previous)
                    t.set(rankCol, i);
                else
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                
            
        


        return "Y";
    

这就是我如何在 Pig 中调用所有内容 -

REGISTER /myJar.jar;
A = LOAD '/Users/Winter/milk-tea-coffee.tsv'  as (year:chararray, milk:double);
B = foreach A generate year, milk, 0 as rank;
C = order B by milk asc; 
D = group C by rank order C by milk;
E = foreach D generate D.C.year,D.C.milk,D.C.rank,  piglet3.evalFunctions.Ranker(D.C,1);
dump E;

由于 UDF 中的打印语句,我可以知道它在 UDF 中的工作 - d!==previous|21.2|0.0|0|now rank val2 d!==previous|21.6|21.2|0|now rank val3 d!==previous|21.9|21.6|0|now rank val4 d!==previous|22.0|21.9|0|now rank val5 d!==previous|22.5|22.0|0|now rank val6 d!==previous|22.9|22.5|0|now rank val7 d!==previous|23.0|22.9|0|now rank val8 d!==previous|23.4|23.0|0|now rank val9 d!==previous|23.8|23.4|0|now rank val10 d!==previous|23.9|23.8|0|now rank val11

但是当我转出 E 或 D 或 C 时,排名列仅包含 0。

【问题讨论】:

【参考方案1】:

exec 函数必须从 UDF 返回您想要的输出。您当前正在修改传递给 exec 函数的元组,然后返回字符串“Y”——Pig 从 UDF 中看到的所有输出都是“Y”。在这种情况下,您应该返回元组而不是“Y”。

我认为以下代码接近您的意图,但我不太清楚您要做什么:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<Tuple>
    @Override
    public Tuple exec(Tuple tuple) throws IOException 
        if (tuple == null || tuple.size() == 0) 
            return null;
        


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) 

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0)    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
             else 
                if(d == previous)
                    t.set(rankCol, i);
                else
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                
            
        


        return tuple;
    

【讨论】:

我认为我的部分问题是要对元组进行排名,您必须将其与上面的元组进行比较,因此您必须处理整个袋子。 例如,一个元组不知道它是否在第二位,除非它可以看到它上面的第一位元组。这就是为什么我把所有的元组放在一个包里,然后把那个包放在一个新的元组里。但可能有一个更好的方法来完成整个事情。

以上是关于apache pig Java UDF - 更改属性中的值似乎并没有坚持的主要内容,如果未能解决你的问题,请参考以下文章

使用 java.lang.NoClassDefFoundError 在 AWS EMR 上运行 Pig UDF:org/apache/pig/LoadFunc

无法解决这些错误 Java (Pig UDF) 添加库、org.apache

在 Apache Pig 中运行 UDF

Apache Pig - 在脚本中多次调用 Java UDF ToJSON

Pig的Python UDF:数据类型转换错误

用于添加列的 java udf