如何使用火花流计算流中的新元素
Posted
技术标签:
【中文标题】如何使用火花流计算流中的新元素【英文标题】:How to count new element from stream by using spark-streaming 【发布时间】:2016-01-14 09:40:49 【问题描述】:我已经完成了日常计算的实现。这是一些伪代码。 "newUser" 可能会调用第一个激活的用户。
// Get today log from hbase or somewhere else
val log = getRddFromHbase(todayDate)
// Compute active user
val activeUser = log.map(line => ((line.uid, line.appId), line).reduceByKey(distinctStrategyMethod)
// Get history user from hdfs
val historyUser = loadFromHdfs(path + yesterdayDate)
// Compute new user from active user and historyUser
val newUser = activeUser.subtractByKey(historyUser)
// Get new history user
val newHistoryUser = historyUser.union(newUser)
// Save today history user
saveToHdfs(path + todayDate)
“activeUser”的计算可以很容易地转换为火花流。这是一些代码:
val transformedLog = sdkLogDs.map(sdkLog =>
val time = System.currentTimeMillis()
val timeToday = ((time - (time + 3600000 * 8) % 86400000) / 1000).toInt
((sdkLog.appid, sdkLog.bcode, sdkLog.uid), (sdkLog.channel_no, sdkLog.ctime.toInt, timeToday))
)
val activeUser = transformedLog.groupByKeyAndWindow(Seconds(86400), Seconds(60)).mapValues(x =>
var firstLine = x.head
x.foreach(line =>
if (line._2 < firstLine._2) firstLine = line
)
firstLine
)
但是“newUser”和“historyUser”的做法让我很困惑。 我认为我的问题可以概括为“如何从流中计算新元素”。正如我上面的伪代码,“newUser”是“activeUser”的一部分。而且我必须维护一组“historyUser”才能知道哪个部分是“newUser”。
我考虑了一种方法,但我认为它可能无法正常工作: 将历史用户加载为 RDD。 Foreach“activeUser”的DStream并找到“historyUser”中不存在的元素。 这里的一个问题是我应该什么时候更新这个“historyUser”的RDD,以确保我可以获得正确的窗口“newUser”。 更新“historyUser”RDD 意味着向它添加“newUser”。就像我在上面的伪代码中所做的一样。该代码中的“historyUser”每天更新一次。 另一个问题是如何从 DStream 执行此更新 RDD 操作。 我认为在窗口滑动时更新“historyUser”是正确的。但我还没有找到合适的 API 来执行此操作。 那么解决这个问题的最佳实践是什么。
【问题讨论】:
实现细节很好,一些想法也可能有所帮助。谢谢! 【参考方案1】:updateStateByKey
在这里会有所帮助,因为它允许您设置初始状态(您的历史用户),然后在您的主流的每个时间间隔更新它。我把一些代码放在一起来解释这个概念
val historyUsers = loadFromHdfs(path + yesterdayDate).map(UserData(...))
case class UserStatusState(isNew: Boolean, values: UserData)
// this will prepare the RDD of already known historical users
// to pass into updateStateByKey as initial state
val initialStateRDD = historyUsers.map(user => UserStatusState(false, user))
// stateful stream
val trackUsers = sdkLogDs.updateStateByKey(updateState, new HashPartitioner(sdkLogDs.ssc.sparkContext.defaultParallelism), true, initialStateRDD)
// only new users
val newUsersStream = trackUsers.filter(_._2.isNew)
def updateState(newValues: Seq[UserData], prevState: Option[UserStatusState]): Option[UserStatusState] =
// Group all values for specific user as needed
val groupedUserData: UserData = newValues.reduce(...)
// prevState is defined only for users previously seen in the stream
// or loaded as initial state from historyUsers RDD
// For new users it is None
val isNewUser = !prevState.isDefined
// as you return state here for the user - prevState won't be None on next iterations
Some(UserStatusState(isNewUser, groupedUserData))
【讨论】:
我认为这种方法将获得最后一次迭代的新用户。但是如何获得更长时间窗口的新用户呢?我试过在“trackUsers”上使用“window”方法,它的行为很奇怪。所有元素似乎在每次迭代时都被复制,出现了很多元素。当我在“newUsersStream”上使用“window”方法时,它似乎运行良好。 您的窗口设置如何?如果您有stream.window(Seconds(10), Seconds(1))
,您将“看到”每个元素 10 次,因为它是重叠窗口。如果是stream.window(Seconds(10), Seconds(10))
,您将只能看到每个元素一次,因为它不重叠。要在更长的窗口中跟踪新用户,您可以执行非重叠窗口(这很简单),或者您可以进行重叠并将计数器添加到您的状态,并在每个新用户的每次迭代中增加它,并且只有在计数器之后才认为它不是新的增加到一定限度。 IE。间隔 = 1 秒,限制 120 秒将保留用户 2 分钟
我正在使用重叠窗口。 Window 方法实际上如您所描述的那样工作。而且我发现只需在您的代码中的“newUsersStream”上使用窗口方法就可以了。它将获得新用户并忽略状态。感谢您的帮助。以上是关于如何使用火花流计算流中的新元素的主要内容,如果未能解决你的问题,请参考以下文章