火花中的共享状态?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了火花中的共享状态?相关的知识,希望对你有一定的参考价值。

我正在阅读防火墙日志数据,其中包括以下内容:

(UniqueID,start_or_stop,timestamp)在某些时候,每个“start”后面都是stop,当它出现时我想输出(UniqueID,start_time,stop_time)

这意味着通常跟踪状态,当行阅读器看到开始然后在字典中记录时,当它看到停止时它然后从字典中删除开始并发出输出。

我的问题是:如何使用apache spark跟踪这种共享状态?

值得指出的是,一旦达到停止,就可以重复使用UniqueID - 它来自sourceIP-sourcePort-destIP-destPort,可以重复使用。

答案

假设约束指示stop的行将始终跟随指示给定startUniqueID的行,请考虑以下输入(0表示开始,1表示停止事件):

UniqueID,start_or_stop,timestamp
u1,0,2018-01-22 13:04:32
u2,0,2018-01-22 13:04:35
u2,1,2018-01-25 18:55:08
u3,0,2018-01-25 18:56:17
u1,1,2018-01-25 20:51:43
u2,0,2018-01-31 07:48:43
u3,1,2018-01-31 07:48:48
u1,0,2018-02-02 09:40:58
u2,1,2018-02-02 09:41:01
u1,1,2018-02-05 14:03:27

然后,我们可以应用以下转换来获得您想要的内容。代码在scala中,但python中提供了相同的函数(因此,我认为可以很容易地推断和移植):

//Define the window specification, after partition and sort, select 
//the 2 rows in the window group that will contain the start/stop time
val wSpec = Window.partitionBy('UniqueID).orderBy('timestamp).rowsBetween(0, 1)

//Assume df is the DataFrame loaded with above data
df.withColumn("Last", last('timestamp) over wSpec). //add a new col having stop time
    where("start_or_stop = 0").  //Just need the alternate rows
    drop("start_or_stop"). //Drop column
    withColumnRenamed("timestamp", "start_time"). //Rename to start
    withColumnRenamed("Last", "stop_time").  //Rename to stop
    show(false)

这提供了以下输出:

+--------+---------------------+---------------------+
|UniqueID|start_time           |stop_time            |
+--------+---------------------+---------------------+
|u3      |2018-01-25 18:56:17.0|2018-01-31 07:48:48.0|
|u1      |2018-01-22 13:04:32.0|2018-01-25 20:51:43.0|
|u1      |2018-02-02 09:40:58.0|2018-02-05 14:03:27.0|
|u2      |2018-01-22 13:04:35.0|2018-01-25 18:55:08.0|
|u2      |2018-01-31 07:48:43.0|2018-02-02 09:41:01.0|
+--------+---------------------+---------------------+

以上是关于火花中的共享状态?的主要内容,如果未能解决你的问题,请参考以下文章

以有状态的方式处理火花中的网络数据包

共享元素返回过渡不适用于片段中的 recyclerview 和 cardview

共享元素转换:活动到嵌套在另一个活动中的片段

如何在火花数据框中进行采样?

后堆栈中的配置更改片段现在正在共享 FrameLayout?

使用 savedInstanceState 保存片段状态