Pig如何实例化UDF对象
Posted
技术标签:
【中文标题】Pig如何实例化UDF对象【英文标题】:How does Pig instantiate UDF objects 【发布时间】:2016-05-04 13:06:52 【问题描述】:谁能告诉我 Pig 如何实例化 UDF 对象?我使用 Pig 构建了一个管道来处理一些数据。我在多节点Hadoop
集群中部署了管道并且我想保存管道中每个步骤之后产生的所有中间结果。所以我用Java写了一个UDF,它将在初始化时打开一个HTTP连接并在exec
中传输数据。另外,我将关闭对象的finalize
中的连接。
我的脚本可以简化如下:
REGISTER MyPackage.jar;
DEFINE InterStore test.InterStore('localhost', '58888');
DEFINE Clean test.Clean();
raw = LOAD 'mydata';
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
grp = GROUP named BY LocationID;
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:(LocationID, AccessCount));
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
ordered = ORDER sum BY TotalAccesses DESC;
STORE ordered INTO 'result';
InterStore 的代码可以简化如下:
class InterStore extends EvalFunc<Tuple>
HttpURLConnection con; //Avoid redundant connection establishment in exec
public InterStore(String ip, String port) throws IOException
URL url = new URL("http://" + ip + ':' + port);
con = (HttpURLConnection)url.openConnection();
con.setRequestMethod("PUT");
con.setDoOutput(true);
con.setDoInput(true);
public Tuple exec(Tuple input) throws IOException
con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
return input;
@Override
protected void finalize() throws Throwable
con.getOutputStream().close();
int respcode = con.getResponseCode();
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
in.close();
但是,我发现 HTTP 连接无法像在本地模式下那样成功传输数据。怎么处理?
【问题讨论】:
嗨,如果您可以分享您尝试使用 UDF 的脚本,那将有所帮助。 我添加了一些代码示例。谢谢~ 【参考方案1】:是否有服务在“localhost”、“58888”上侦听?
请注意,本地主机因每个执行节点而异,您可能需要这样做:
%default LHOST `localhost`
并使用这个变量作为参数
DEFINE InterStore test.InterStore('$LHOST', '58888');
一般情况下,我会在 UDF 中进行一些打印输出并仔细检查传递给它的参数,然后测试连接(例如 ping 并检查端口是否可以从 hadoop 节点访问)
【讨论】:
以上是关于Pig如何实例化UDF对象的主要内容,如果未能解决你的问题,请参考以下文章
(JAVA)啥是实例化如何实现类的实例化(用类或方法创建一个该类的实例)?