会话化网络日志,获取上一个和下一个域
Posted
技术标签:
【中文标题】会话化网络日志,获取上一个和下一个域【英文标题】:Sessionized web logs, get previous and next domain 【发布时间】:2012-10-26 20:58:31 【问题描述】:我们有一大堆网络日志数据。我们需要对它进行会话化,并为每个会话生成上一个域和下一个域。我正在通过 AWS EMR 上的交互式作业流程进行测试。
现在我可以在此处使用以下代码获取会话化数据:http://goo.gl/L52Wf。熟悉编译和使用 UDF 需要一些工作,但我已经做到了。
这是输入文件的标题行和第一行(制表符分隔):
ID Date Rule code Project UID respondent_uid Type Tab ID URL domain URL path Duration Exit cause Details
11111111 2012-09-25T11:21:20.000Z 20120914_START_USTEST 20120914_TESTSITE_US_TR test6_EN_9 PAGE_VIEWED FF1348568479042 http://www.google.fr 11 OTHER
这是来自SESSIONS
关系的元组(获取关系的步骤如下所示):
(2012-09-27 04:42:20.000,11999603,20120914_URL_ALL,20120914_TESTSITE_US_TR,2082810875_US_9,PAGE_VIEWED,CH17,http://hotmail.com,_news/2012/09/26/14113684,28,WINDOW_DEACTIVATED,,3019222a-5c4d-4767-a82e-2b4df5d9db6d)
这大致就是我现在正在运行的测试数据会话:
register s3://TestBucket/Sessionize.jar
define Sessionize datafu.pig.sessions.Sessionize('30m');
A = load 's3://TestBucket/party2.gz' USING PigStorage() as (id: chararray, data_date: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray);
B = foreach A generate $1, $0, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11;
C = filter B by id neq 'ID';
VIEWS = group C by (respondent_uid, url_domain);
SESSIONS = foreach VIEWS VISITS = order C by data_date; generate FLATTEN(Sessionize(VISITS)) as (data_date: chararray, id: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray, session_id);
(B处的步骤是将日期移到第一位。C处的步骤是过滤掉文件头)
我迷失了从这里开始的正确方向。
我可以遍历我的SESSIONS
与foreach
的关系并从猪脚本中获取下一个和上一个域吗?编写自定义 UDF 并传递 SESSIONS
关系会更好吗? (编写我自己的 UDF 将是一次冒险!..)
任何建议将不胜感激。即使有人可以推荐 不 做什么,也可能同样有帮助,所以我不会浪费时间研究垃圾方法。我对 Hadoop 和 pig 脚本还很陌生,所以这绝对不是我的强项之一(但..)。
【问题讨论】:
能否提供一些输入输出数据? @alexeipab 我添加了输入数据的示例,以及来自SESSIONS
关系的元组示例。输出还不存在,我需要respondent_uid
、session_id
以及下一个域和上一个域。我可以将SESSIONS
关系分组,但我不确定如何获得下一个和上一个。如果我可以提供任何其他有用的信息,请告诉我
只是一个更新。我现在正在尝试走 UDF 路线。随着进展和失败的发生,我会继续更新。
我有一个不那么漂亮的 UDF 解决方案在工作!一旦我清理它并进行一些调整,我会将它添加为其他遇到此类问题的人的答案。
【参考方案1】:
如果有人可以改进下面的解决方案,我一点也不感到惊讶,但是,它适用于我的情况。我使用 sessionize UDF(在我的问题中提到)作为编写以下 UDF 的参考。
import java.io.IOException;
import java.util.ArrayList;
import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class PreviousNext extends EvalFunc<DataBag> implements Accumulator<DataBag>
private DataBag outputBag;
private String previous;
private String next;
public PreviousNext()
cleanup();
@Override
public DataBag exec(Tuple input) throws IOException
accumulate(input);
DataBag outputBag = getValue();
cleanup();
return outputBag;
@Override
public void accumulate(Tuple input) throws IOException
ArrayList<String> domains = new ArrayList<String>();
DataBag d = (DataBag)input.get(0);
//put all domains into ArrayList to allow for
//accessing specific indexes
for(Tuple t : d)
domains.add((String)t.get(2));
//add empty string for "next domain" value for last iteration
domains.add("");
int i = 0;
previous = "";
for(Tuple t : d)
next = domains.get(i+1);
Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
t_new.append(previous);
t_new.append(next);
outputBag.add(t_new);
//current domain is previous for next iteration
previous = domains.get(i);
i++;
@Override
public void cleanup()
this.outputBag = BagFactory.getInstance().newDefaultBag();
@Override
public DataBag getValue()
return outputBag;
@Override
public Schema outputSchema(Schema input)
try
Schema.FieldSchema inputFieldSchema = input.getField(0);
if (inputFieldSchema.type != DataType.BAG)
throw new RuntimeException("Expected a BAG as input");
Schema inputBagSchema = inputFieldSchema.schema;
if (inputBagSchema.getField(0).type != DataType.TUPLE)
throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName(inputBagSchema.getField(0).type)));
Schema inputTupleSchema = inputBagSchema.getField(0).schema;
Schema outputTupleSchema = inputTupleSchema.clone();
outputTupleSchema.add(new Schema.FieldSchema("previous_domain", DataType.CHARARRAY));
outputTupleSchema.add(new Schema.FieldSchema("next_domain", DataType.CHARARRAY));
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),outputTupleSchema,DataType.BAG));
catch (CloneNotSupportedException e)
throw new RuntimeException(e);
catch (FrontendException e)
throw new RuntimeException(e);
【讨论】:
以上是关于会话化网络日志,获取上一个和下一个域的主要内容,如果未能解决你的问题,请参考以下文章