如何更新火花流中的广播变量?

Posted

技术标签:

【中文标题】如何更新火花流中的广播变量?【英文标题】:How can I update a broadcast variable in spark streaming? 【发布时间】:2016-01-27 02:49:14 【问题描述】:

我相信,我有一个相对常见的火花流用例:

我有一个对象流,我想根据一些参考数据进行过滤

最初,我认为使用广播变量来实现这将是一件非常简单的事情:

public void startSparkEngine 
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> 
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    

    filteredStream.foreachRDD(rdd -> 
        rdd.foreach(obj -> 
            // Final processing of filtered objects
        );
        return null;
    );

但是,尽管很少,我的参考数据会定期更改

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播到每个工人,但是 Broadcast 对象不是 Serializable并且需要是final

我有什么选择?我能想到的三个解决方案是:

    将参考数据查找移动到forEachPartitionforEachRdd,使其完全驻留在工作人员身上。然而,参考数据存在于 REST API 中,因此我还需要以某种方式存储一个计时器/计数器,以停止远程访问流中的每个元素。

    每次 refdata 更改时都使用新的广播变量重新启动 Spark 上下文。

    将参考数据转换为RDD,然后join 以我现在正在流式传输Pair&lt;MyObject, RefData&gt; 的方式,尽管这会将参考数据与每个对象一起传送.

【问题讨论】:

【参考方案1】:

通过@Rohan Aletty 扩展答案。这是一个基于 ttl 刷新广播变量的 BroadcastWrapper 的示例代码

public class BroadcastWrapper 

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper()

    public static BroadcastWrapper getInstance() 
        return obj;
    

    public JavaSparkContext getSparkContext(SparkContext sc) 
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext)
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000)  //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       
       return var;
   

您的代码如下所示:

public void startSparkEngine() 

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> 
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    );

    filteredStream.foreachRDD(rdd -> 
        rdd.foreach(obj -> 
        // Final processing of filtered objects
        );
        return null;
    );

这对我也适用于多集群。 希望这会有所帮助

【讨论】:

感谢您的解决方案。不知道updateAndGet是在Driver节点上执行还是在Worker节点上执行?包装器本身似乎没有被广播,所以我认为它在每个 Worker 节点上都不可用。如果它在 Driver 节点上执行,这是否意味着每个 Worker 每次尝试访问该值时都必须询问 Driver ? (这与首先使用广播变量的想法相矛盾) 这个函数返回广播类型对象的引用。广播类型对象将具有广播变量的标识符和块数。当调用 refdataBroadcast.getValue() 时,如果广播标识符存在于执行程序内存中,则不会重新计算。所有这些都发生在执行程序上,但是当调用 sparkContext.broadcast 时,驱动程序就会出现。所以updateAndGet只有​​在变量被刷新和重新广播时才会在驱动节点上执行(只有驱动可以处理)。 知道在新的结构化流 API 中如何实现相同的功能吗? 由于我面临类似的问题,我想知道是否有人对上面的代码进行了 Python 实现?我认为这可能是克服我目前遇到的一些困难的好方法。感谢所有帮助,谢谢。 @Aastha 转换函数将在执行程序上运行,并且无法访问那里的 sparkContext。我的理解是广播变量的更新只能从foreachRDDforaeachBatch 函数中发生,并且对广播的更新引用仅适用于这些函数的范围。这种理解正确吗?【参考方案2】:

最近遇到了这个问题。认为它可能对 scala 用户有帮助..

Scala 处理BroadCastWrapper 的方式如下例所示。

import java.io. ObjectInputStream, ObjectOutputStream 
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) 

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = 

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = 
    out.writeObject(v)
  

  private def readObject(in: ObjectInputStream): Unit = 
    v = in.readObject().asInstanceOf[Broadcast[T]]
  

每次你需要调用更新函数来获取新的广播变量。

【讨论】:

对于那些想知道的人, value 公开了底层广播对象以供只读;而对于一些特殊的序列化情况,writeObject 和 readObject 是必需的。见:Java Serialization @JMess 只是想知道您是否需要在 writeObject 和 readObject 中调用 close() 方法? @PetrFedosov 否; close 没有必要,也不可取。【参考方案3】:

几乎每个处理流式应用程序的人都需要一种方法将参考数据(来自数据库、文件等)编织(过滤、查找等)到流式数据中。我们有整个两部分的部分解决方案

    查找要在流式操作中使用的参考数据

    创建具有所需缓存 TTL 的 CacheLookup 对象 将其包装在广播中 使用 CacheLookup 作为流逻辑的一部分

在大多数情况下,这都可以正常工作,但以下情况除外

    更新参考数据

    尽管这些线程中提出了建议,但没有确定的方法来实现这一点,即:杀死以前的广播变量并创建新的。多个未知数,例如这些操作之间的预期。

这是一个常见的需求,如果有一种方法可以将信息发送到广播变量通知更新,那将会有所帮助。这样,可以使“CacheLookup”中的本地缓存无效

问题的第二部分仍未解决。如果有任何可行的方法,我会很感兴趣

【讨论】:

【参考方案4】:

不确定您是否已经尝试过,但我认为可以在不关闭SparkContext 的情况下更新广播变量。通过使用unpersist() 方法,广播变量的副本在每个执行程序上被删除,并且需要重新广播才能再次访问该变量。对于您的用例,当您想更新广播时,您可以:

    等待您的执行者完成当前的一系列数据

    解除广播变量

    更新广播变量

    重播以将新的参考数据发送给执行者

我从this post 大量绘制,但最后一个回复的人声称它已在本地工作。重要的是要注意,您可能希望在 unpersist 上将阻塞设置为 true,以便您可以确保执行程序删除了旧数据(因此在下一次迭代中不会再次读取过时的值)。

【讨论】:

【参考方案5】:

我以不同的方式做到了。

我创建了一个广播变量,并每 5 分钟在驱动程序的不同线程中更新它。

  var broadcastValue: Broadcast[Set[String]] = spark.sparkContext.broadcast(calculateValue())

  def runScheduledThreadToUpdateBroadcastVariable(): Unit = 
    val updateTask = new Runnable 
      def run() = 
        broadcastValue.unpersist(blocking = false)
        broadcastValue = spark.sparkContext.broadcast(calculateValue())
      
    

    val executor = new ScheduledThreadPoolExecutor(1)
    executor.scheduleAtFixedRate(updateTask, 1, 5, TimeUnit.MINUTES)
  

【讨论】:

【参考方案6】:

实现的最简单方法,下面的代码读取每个批次的维度数据文件夹,但请记住新的维度数据值(在我的例子中为国家名称)必须是一个新文件。

package com.databroccoli.streaming.dimensionupateinstreaming

import org.apache.log4j.Level, Logger
import org.apache.spark.sql.DataFrame, ForeachWriter, Row, SparkSession
import org.apache.spark.sql.functions.broadcast, expr
import org.apache.spark.sql.types.StringType, StructField, StructType, TimestampType

object RefreshDimensionInStreaming 

  def main(args: Array[String]) = 

    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")

    var countryDf: Option[DataFrame] = None: Option[DataFrame]

    def updateDimensionDf() = 
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")

      if (countryDf != None) 
        countryDf.get.unpersist()
      

      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))

      countryDf.get.show()
    

    factDf1.writeStream
      .outputMode("append")
      .foreachBatch  (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)

        updateDimensionDf()

        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show

      
      .start()
      .awaitTermination()

  



【讨论】:

以上是关于如何更新火花流中的广播变量?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流计算流中的新元素

Spark:火花流中的接收器是瓶颈吗?

如何在三星智能电视上提取嵌入在 Icecast 音频(广播)流中的流式“正在播放”数据

即使有 0 条消息,火花流中的转换也需要更多时间

处理 flink 广播流中的大数据

每个微批次火花流中处理的总记录