如何使用火花流计算流中的新元素

Posted

技术标签:

【中文标题】如何使用火花流计算流中的新元素【英文标题】:How to count new element from stream by using spark-streaming 【发布时间】:2016-01-14 09:40:49 【问题描述】:

我已经完成了日常计算的实现。这是一些伪代码。 "newUser" 可能会调用第一个激活的用户。

// Get today log from hbase or somewhere else
val log = getRddFromHbase(todayDate)
// Compute active user
val activeUser = log.map(line => ((line.uid, line.appId), line).reduceByKey(distinctStrategyMethod)
// Get history user from hdfs
val historyUser = loadFromHdfs(path + yesterdayDate)
// Compute new user from active user and historyUser
val newUser = activeUser.subtractByKey(historyUser)
// Get new history user
val newHistoryUser = historyUser.union(newUser)
// Save today history user
saveToHdfs(path + todayDate)

“activeUser”的计算可以很容易地转换为火花流。这是一些代码:

    val transformedLog = sdkLogDs.map(sdkLog => 
      val time = System.currentTimeMillis()
      val timeToday = ((time - (time + 3600000 * 8) % 86400000) / 1000).toInt
      ((sdkLog.appid, sdkLog.bcode, sdkLog.uid), (sdkLog.channel_no, sdkLog.ctime.toInt, timeToday))
    )
    val activeUser = transformedLog.groupByKeyAndWindow(Seconds(86400), Seconds(60)).mapValues(x => 
      var firstLine = x.head
      x.foreach(line => 
        if (line._2 < firstLine._2) firstLine = line
      )
      firstLine
    )

但是“newUser”和“historyUser”的做法让我很困惑。 我认为我的问题可以概括为“如何从流中计算新元素”。正如我上面的伪代码,“newUser”是“activeUser”的一部分。而且我必须维护一组“historyUser”才能知道哪个部分是“newUser”。

我考虑了一种方法,但我认为它可能无法正常工作: 将历史用户加载为 RDD。 Foreach“activeUser”的DStream并找到“historyUser”中不存在的元素。 这里的一个问题是我应该什么时候更新这个“historyUser”的RDD,以确保我可以获得正确的窗口“newUser”。 更新“historyUser”RDD 意味着向它添加“newUser”。就像我在上面的伪代码中所做的一样。该代码中的“historyUser”每天更新一次。 另一个问题是如何从 DStream 执行此更新 RDD 操作。 我认为在窗口滑动时更新“historyUser”是正确的。但我还没有找到合适的 API 来执行此操作。 那么解决这个问题的最佳实践是什么。

【问题讨论】:

实现细节很好,一些想法也可能有所帮助。谢谢! 【参考方案1】:

updateStateByKey 在这里会有所帮助,因为它允许您设置初始状态(您的历史用户),然后在您的主流的每个时间间隔更新它。我把一些代码放在一起来解释这个概念

val historyUsers = loadFromHdfs(path + yesterdayDate).map(UserData(...))

case class UserStatusState(isNew: Boolean, values: UserData)

// this will prepare the RDD of already known historical users
// to pass into updateStateByKey as initial state
val initialStateRDD = historyUsers.map(user => UserStatusState(false, user))

// stateful stream
val trackUsers = sdkLogDs.updateStateByKey(updateState, new HashPartitioner(sdkLogDs.ssc.sparkContext.defaultParallelism), true, initialStateRDD)
// only new users
val newUsersStream = trackUsers.filter(_._2.isNew)


def updateState(newValues: Seq[UserData], prevState: Option[UserStatusState]): Option[UserStatusState] = 
  // Group all values for specific user as needed
  val groupedUserData: UserData = newValues.reduce(...)

  // prevState is defined only for users previously seen in the stream
  // or loaded as initial state from historyUsers RDD
  // For new users it is None
  val isNewUser = !prevState.isDefined
  // as you return state here for the user - prevState won't be None on next iterations
  Some(UserStatusState(isNewUser, groupedUserData))


【讨论】:

我认为这种方法将获得最后一次迭代的新用户。但是如何获得更长时间窗口的新用户呢?我试过在“trackUsers”上使用“window”方法,它的行为很奇怪。所有元素似乎在每次迭代时都被复制,出现了很多元素。当我在“newUsersStream”上使用“window”方法时,它似乎运行良好。 您的窗口设置如何?如果您有stream.window(Seconds(10), Seconds(1)),您将“看到”每个元素 10 次,因为它是重叠窗口。如果是stream.window(Seconds(10), Seconds(10)),您将只能看到每个元素一次,因为它不重叠。要在更长的窗口中跟踪新用户,您可以执行非重叠窗口(这很简单),或者您可以进行重叠并将计数器添加到您的状态,并在每个新用户的每次迭代中增加它,并且只有在计数器之后才认为它不是新的增加到一定限度。 IE。间隔 = 1 秒,限制 120 秒将保留用户 2 分钟 我正在使用重叠窗口。 Window 方法实际上如您所描述的那样工作。而且我发现只需在您的代码中的“newUsersStream”上使用窗口方法就可以了。它将获得新用户并忽略状态。感谢您的帮助。

以上是关于如何使用火花流计算流中的新元素的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花流中刷新加载的数据帧内容?

火花流中的广播变量空指针异常

如何在火花结构化流式读取流中倒带 Kafka 偏移

如何在火花流中添加 2 行具有相同键(列值)的行?

即使有 0 条消息,火花流中的转换也需要更多时间

每个微批次火花流中处理的总记录