PIG FILTER 关系与下一行相同的关系

Posted

技术标签:

【中文标题】PIG FILTER 关系与下一行相同的关系【英文标题】:PIG FILTER relation with next row the same relation 【发布时间】:2014-02-19 14:36:14 【问题描述】:

我现在正在寻找很长时间来解决我的问题,但几乎没有发现任何有用的东西。 希望你们中的一些人能给我一个提示。

我有一个关系 A,格式如下:用户名、时间戳、ip

例如:

Harald 2014-02-18T16:14:49.503Z 123.123.123.123
Harald 2014-02-18T16:14:51.503Z 123.123.123.123
Harald 2014-02-18T16:14:55.503Z 321.321.321.321

我想知道,谁在不到 5 秒的时间内更改了他的 IP 地址。所以第二排和第三排应该很有趣。

我想按用户名对关系进行分组,并且想将实际行的时间戳与下一行进行比较。如果 ip 地址不相同并且时间戳小于 5 秒,则应该在输出中。

有人可以帮我解决这个问题吗?

问候。


首先我要感谢您的宝贵时间。

但我实际上停留在 Sessionize 部分。

这是我进来的数据:

aoebcu  2014-02-19T14:23:17.503Z    220.61.65.25
aoebcu  2014-02-19T14:23:14.503Z    222.117.144.19
aoebcu  2014-02-19T14:23:14.503Z    222.117.144.19
jekgru  2014-02-19T14:23:14.503Z    213.56.157.109
zmembx  2014-02-19T14:23:12.503Z    199.188.198.91
qhixcg  2014-02-19T14:23:11.503Z    203.40.104.119

到目前为止,我的代码如下所示:

hijack_Reduced = FOREACH finalLogs GENERATE ClientUserName, timestamp, OriginalClientIP;
hijack_Filtered = FILTER hijack_Reduced BY OriginalClientIP != '-';

hijack_Sessionized = FOREACH (GROUP hijack_Filtered BY ClientUserName) 
  views = ORDER hijack_Filtered BY timestamp;
  GENERATE FLATTEN(Sessionize(views)) AS (ClientUserName,timestamp,OriginalClientIP,session_id);

但是当我运行这个脚本时,我收到了以下错误消息:

15:36:22 错误 - org.apache.pig.tools.pigstats.SimplePigStats.setBackendException(542) |错误 0:执行 [POUserFunc 时出现异常(名称: POUserFunc(datafu.pig.sessions.Sessionize)[bag] - scope-199 Operator 键:范围 199)孩子:在 []] 处为空: java.lang.IllegalArgumentException:无效格式:“aoebcu”

我已经尝试了很多,但没有任何效果。 你有什么想法吗?

问候

【问题讨论】:

您需要编写一个 UDF。 你可以考虑使用 esper 【参考方案1】:

虽然您可以为此编写 UDF,但您实际上可以利用 Apache DataFu 中已有的 UDF 来解决此问题。

我的解决方案涉及对数据应用会话化。基本上,您查看连续事件并为每个事件分配一个会话 ID。如果两个事件之间经过的时间超过了指定的时间量,在您的情况下为 5 秒,那么下一个事件将获得一个新的会话 ID。否则,连续事件获得相同的会话 ID。一旦为每个事件分配了其会话 ID,剩下的就很容易了。我们按会话 ID 分组并查找具有多个不同 IP 地址的会话。

我将介绍我的解决方案。

假设您有以下输入数据。 Harold 和 Kumar 都更改了他们的 IP 地址。但是 Harold 在 5 秒内完成了,而 Kumar 没有。所以我们脚本的输出应该只是简单的“Harold”。

  Harold,2014-02-18T16:14:49.503Z,123.123.123.123
  Harold,2014-02-18T16:14:51.503Z,123.123.123.123
  Harold,2014-02-18T16:14:55.503Z,321.321.321.321
  Kumar,2014-02-18T16:14:49.503Z,123.123.123.123
  Kumar,2014-02-18T16:14:55.503Z,123.123.123.123
  Kumar,2014-02-18T16:15:05.503Z,321.321.321.321

加载数据

data = LOAD 'input' using PigStorage(',') 
       AS (user:chararray,time:chararray,ip:chararray);

现在从 DataFu 定义几个 UDF。 Sessionize UDF 执行我之前描述的会话。 DistinctBy UDF 将用于在每个会话中查找不同的 IP 地址。

define Sessionize datafu.pig.sessions.Sessionize('5s');

define DistinctBy datafu.pig.bags.DistinctBy('1');

按用户分组数据,按时间排序,并应用 Sessonize UDF。请注意,时间戳必须是第一个字段,因为这是 Sessionize 所期望的。此 UDF 将会话 ID 附加到每个元组。

data = FOREACH data GENERATE time,user,ip;

data_sessionized = FOREACH (GROUP data BY user) 
  views = ORDER data BY time;
  GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);

现在数据是会话化的,我们可以按用户和会话分组。我也按用户分组,因为我想把这个值吐出来。我们将事件包传递给 DistinctBy UDF。查看此 UDF 的文档以获取更详细的说明。但本质上,我们将获得与每个会话不同的 IP 地址一样多的元组。请注意,我已经从下面的关系中删除了时间。这是因为 1) 不需要,2) DataFu 1.2.0 中的 DistinctBy 在处理包含破折号的字段时存在错误,就像时间字段一样。

data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;

data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
  group.user as user,
  SIZE(DistinctBy(data_sessionized)) as distinctIpCount;

现在选择具有多个不同 IP 地址的所有会话并返回这些会话的不同用户。

data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;

data_sessionized = FOREACH data_sessionized GENERATE user;

data_sessionized = DISTINCT data_sessionized;

这很简单:

Harold

这是完整的源代码,您应该可以直接粘贴到 DataFu 单元测试中并运行:

  /**
  define Sessionize datafu.pig.sessions.Sessionize('5s');

  define DistinctBy datafu.pig.bags.DistinctBy('1'); -- distinct by ip

  data = LOAD 'input' using PigStorage(',') AS (user:chararray,time:chararray,ip:chararray);

  data = FOREACH data GENERATE time,user,ip;

  data_sessionized = FOREACH (GROUP data BY user) 
    views = ORDER data BY time;
    GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);
  

  data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;

  data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
    group.user as user,
    SIZE(DistinctBy(data_sessionized)) as distinctIpCount;

  data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;

  data_sessionized = FOREACH data_sessionized GENERATE user;

  data_sessionized = DISTINCT data_sessionized;

  STORE data_sessionized INTO 'output';
   */
  @Multiline private String sessionizeUserIpTest;

  private String[] sessionizeUserIpTestData = new String[] 
      "Harold,2014-02-18T16:14:49.503Z,123.123.123.123",
      "Harold,2014-02-18T16:14:51.503Z,123.123.123.123",
      "Harold,2014-02-18T16:14:55.503Z,321.321.321.321",
      "Kumar,2014-02-18T16:14:49.503Z,123.123.123.123",
      "Kumar,2014-02-18T16:14:55.503Z,123.123.123.123",
      "Kumar,2014-02-18T16:15:05.503Z,321.321.321.321"
  ;

  @Test
  public void sessionizeUserIpTest() throws Exception
  
    PigTest test = createPigTestFromString(sessionizeUserIpTest);

    this.writeLinesToFile("input", 
        sessionizeUserIpTestData);

    List<Tuple> result = this.getLinesForAlias(test, "data_sessionized");

    assertEquals(result.size(),1);
    assertEquals(result.get(0).get(0),"Harold");
  

【讨论】:

您好,matterhayes,您能看看我由 Sessionize 引起的错误消息吗?谢谢! 当然,您需要将时间戳设置为第一个字段。我将使用该注释更新此答案。看看我在其他评论中描述的 FOREACH。

以上是关于PIG FILTER 关系与下一行相同的关系的主要内容,如果未能解决你的问题,请参考以下文章

Pig Latin 在一个 FILTER 语句中使用两个数据源

将多个参数传递给 Pig Filter UDF

使用Python Dataframe将行与下一行进行比较并从结果中创建一个新列?

pig中“无法打开别名的迭代器”是啥意思?

题解 P1019 单词接龙

数据库复习⑤