对于 Databag 中的每个元组都在一次又一次地从 try 块中执行

Posted

技术标签:

【中文标题】对于 Databag 中的每个元组都在一次又一次地从 try 块中执行【英文标题】:For Each tuple in the Databag is executing from the try block again and again 【发布时间】:2017-03-14 12:42:17 【问题描述】:

代码如下:

public class databag extends EvalFunc<DataBag> 
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

private DataBag result;
private String delimiterType = ": Src / dest :";
public DataBag exec(Tuple input) throws IOException 

    try
        result = mBagFactory.newDefaultBag(); // change here
        result.add(input);

        getLogger().info("::::::: Entered try block ::::::::::::");

        // create indexing for source and destination . ::: (Arraylist<Object[]>)
        ConcurrentHashMap<Object, ArrayList<Integer>> srcIndexMap =  new ConcurrentHashMap<Object, ArrayList<Integer>>();
        ConcurrentHashMap<Object, ArrayList<Integer>> destIndexMap = new ConcurrentHashMap<Object, ArrayList<Integer>>();


        // store the rows to Arraylist(Object[]) collection by converting . 
        ArrayList<Object[]> source = new ArrayList<Object[]>();
        ArrayList<Object[]> destination = new ArrayList<Object[]>();

        int srcCounter = 0;
        int destCounter = 0;

    ArrayList<Integer> Sourcearray = new ArrayList<Integer>();
        ArrayList<Integer> Destinationarray = new ArrayList<Integer>(); 
for (Iterator<Tuple> iter = result.iterator(); iter.hasNext();) 
//some code here

我正在尝试使用 for 循环迭代数据包中的元组,但是对于每个元组,所有集合都被重新初始化,换句话说,它从每个元组的 try 块中执行。

输出:

INFO  PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap=
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method
PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap=
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method

更新

猪脚本

REGISTER /usr/local/pig/UDF/UDFBAG.jar;

sourcenew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Source1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

destnew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Destination1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

cogroupnew = COGROUP sourcenew BY ID inner, destnew BY ID inner;

diff_data = FOREACH cogroupnew GENERATE DIFF(sourcenew,destnew);

ids = FOREACH diff_data GENERATE FLATTEN($0);

id1 = DISTINCT( FOREACH ids GENERATE $0);

src = FILTER sourcenew BY ID == id1.$0;

finalsrc = FOREACH src GENERATE *, 'Source' as Source:chararray;

dest = FILTER destnew BY ID == id1.$0;

finaldest = FOREACH dest GENERATE *, 'Destination' as Destination:chararray;

final =  UNION finalsrc,finaldest ;

A = FOREACH final GENERATE PigUDFpck.databag(*);

DUMP A;

以及UDF的输入如下:

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination)

非常感谢您的帮助。!! 提前致谢..!

【问题讨论】:

【参考方案1】:

请理解 PIG 是一个 DAG 生成器,它基于 DAG 生成 Map Reduce Jobs。

更高级别的 PIG 构造,如 LOAD 、 FOREACH 、 JOIN 归结为更低级别的 MR 构造

> Load       => Mapper in MR 
> GENRERATE  => a function call in Mapper or Reduce
> JOIN       => SHUFFLE (Join in Map Reduce)
> Filter     => Filter function in Map or Reduce

databag 函数不是被调用一次而是被多次调用,因为它是在 Reducer 的 Mapper 中执行的函数调用。

对于每个输入 ROW(取决于 databag UDF 是否成为 mapper 或 reducer 的一部分)dataBag 将被执行。

请通过 pig 中的 EXPAIN 命令,它将 PIG 脚本转换为底层 MR 作业的链接

详细了解请关注:

    http://bytepadding.com/big-data/map-reduce/pig-to-map-and-reduce/

    http://bytepadding.com/big-data/map-reduce/understanding-map-reduce-the-missing-guide/

【讨论】:

谢谢@KrazyGautam 我知道它将为数据包中的每个元组执行,但是否可以将元组分成两个数组并比较它们以获得源数组和目标数组中的最佳马赫数。 .? @Vickyster 为什么不直接在 pig 脚本端进行分组并在 UDF 中接收带有数据包的元组? 嗨@Alexey,我可以对它进行分组,但不幸的是,我将'Source'和'Destination'附加到元组中,以便于识别元组来自哪个包,所以我可以遍历在 UDF 中的两个数组中打包并分离源和目标元组。请查看更新后的 pig 脚本和 UDF 的输入。谢谢。【参考方案2】:

好的,评论有点大了

...
src = FILTER sourcenew BY ID == id1.$0;

finalsrc = FOREACH src GENERATE *, 'Source' as Source:chararray;

dest = FILTER destnew BY ID == id1.$0;

finaldest = FOREACH dest GENERATE *, 'Destination' as Source:chararray;
final =  UNION finalsrc,finaldest ;
A = FOREACH (group final by ID) 
    src = filter final by Source == 'Source';
    dest = filter final by Source == 'Destination';
    GENERATE flatten(PigUDFpck.databag(src, dest));

在这种情况下,UDF 将收到一个包含 2 袋元组的元组,您可以对其进行比较。另外我很确定它可以简化(我的意思是,您可以在加载后立即执行此联合和分组 - 只需为每个生成一个标志,告诉您它是源还是目标)

【讨论】:

我收到错误,错误 org.apache.pig.tools.grunt.Grunt - 错误 1200:Pig 脚本无法解析: 表达式不是项目表达式: (名称:ScalarExpression)类型:null Uid:null)同时执行 A = FOREACH(按 ID 分组最终) src = filter cogroupnew by Source == 'Source'; dest = 按 Source == 'Destination' 过滤 cogroupnew;生成扁平化(PigUDFpck.databag(src,dest)); 可能是在 UDF 中获取数据包的方式是错误的请告诉我,下面是我使用的公共类数据包扩展 EvalFunc @Vickyster 已编辑。只是得到这个想法,看看它是否适合你。 好的@Alexey 谢谢你,我会试试看!

以上是关于对于 Databag 中的每个元组都在一次又一次地从 try 块中执行的主要内容,如果未能解决你的问题,请参考以下文章

即使一次又一次地刷新页面,如何在车把中永久设置所选选项?

使用循环一次又一次地迭代

为啥 CoreBluetooth 一次又一次地发现相同的外设?

Push sharp:一次又一次地发送相同的消息

一次又一次地解析错误[关闭]

Git rebase 一次又一次地回到同一个地方