如何在火花流中刷新加载的数据帧内容?

Posted

技术标签:

【中文标题】如何在火花流中刷新加载的数据帧内容?【英文标题】:How to refresh loaded dataframe contents in spark streaming? 【发布时间】:2019-08-16 06:26:00 【问题描述】:

使用 spark-sql 2.4.1 和 kafka 进行实时流式传输。 我有以下用例

    需要从 hdfs 加载元数据以加入来自 kafka 的流式数据帧。 应在元数据数据帧特定列(col-X)数据中查找流数据记录的特定列。 如果找到选择元数据列(col-Y)数据 否则未找到,将流式记录/列数据插入元数据数据帧,即插入 hdfs。 IE。应该查一下,如果 流数据帧再次包含相同的数据。

作为在 spark 作业开始时加载的元数据,如何在流式作业中再次刷新其内容以查找并加入另一个流式数据帧?

【问题讨论】:

【参考方案1】:

我可能误解了这个问题,但刷新元数据数据框应该是开箱即用支持的功能。

你根本不需要做任何事情。

让我们看一下例子:

// a batch dataframe
val metadata = spark.read.text("metadata.txt")
scala> metadata.show
+-----+
|value|
+-----+
|hello|
+-----+

// a streaming dataframe
val stream = spark.readStream.text("so")

// join on the only value column
stream.join(metadata, "value").writeStream.format("console").start

只要so 目录中的文件内容与metadata.txt 文件匹配,您应该会在控制台上打印出一个数据框。

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|hello|
+-----+

metadata.txt 更改为world,只有来自新文件的世界才会匹配。

【讨论】:

【参考方案2】:

编辑这个解决方案更加精细,并且可以工作(适用于所有用例)。 对于将数据附加到现有文件而不更改文件或从数据库中读取的更简单情况,可以使用in the other answer 指出的更简单的解决方案。 这是因为数据帧(和底层 RDD)分区只创建一次,并且每次使用数据帧时都会读取数据。 (除非被spark缓存)


如果负担得起,您可以尝试(重新)在每个微章节中读取此 元数据数据帧

更好的方法是将 元数据数据帧 放入缓存中(不要与 spark 缓存数据帧混淆)。缓存类似于映射,不同之处在于它不会给插入的条目超过配置的生存时间。

在您的代码中,您将尝试为每个微批次从缓存中获取此元数据数据帧。如果缓存返回null。您将再次读取数据帧,放入缓存,然后使用数据帧。

Cache 类将是

import scala.collection.mutable

// cache class to store the dataframe
class Cache[K, V](timeToLive: Long) extends mutable.Map[K, V] 
  private var keyValueStore = mutable.HashMap[K, (V, Long)]()

  override def get(key: K):Option[V] = 
    keyValueStore.get(key) match 
      case Some((value, insertedAt)) if insertedAt+timeToLive > System.currentTimeMillis => Some(value)
      case _ => None
    
  

  override def iterator: Iterator[(K, V)] = keyValueStore.iterator
    .filter(
      case (key, (value, insertedAt)) => insertedAt+timeToLive > System.currentTimeMillis
    ).map(x => (x._1, x._2._1))

  override def -=(key: K): this.type = 
    keyValueStore-=key
    this
  

  override def +=(kv: (K, V)): this.type = 
    keyValueStore += ((kv._1, (kv._2, System.currentTimeMillis())))
    this
  

通过缓存访问元数据数据帧的逻辑

import org.apache.spark.sql.DataFrame

object DataFrameCache 
  lazy val cache = new Cache[String, DataFrame](600000) // ten minutes timeToLive

  def readMetaData: DataFrame = ???

  def getMetaData: DataFrame = 
    cache.get("metadataDF") match 
      case Some(df) => df
      case None => 
        val metadataDF = readMetaData
        cache.put("metadataDF", metadataDF)
        metadataDF
      
    
  

【讨论】:

Cache 类使用已更新的mutable.HashMap 如果数据帧缓存在 Spark 中,则(更新的)文件将不会再次读取。【参考方案3】:

下面是我在 spark 2.4.5 中使用流连接进行左外连接的场景。下面的过程是推动 spark 读取最新的维度数据更改。

流程用于批量维度的流连接(始终更新)

第 1 步:-

在开始 Spark 流式传输作业之前:- 确保维度批处理数据文件夹只有一个文件,并且该文件应该至少有一条记录(由于某种原因放置空文件不起作用)。

第 2 步:- 开始您的流式传输作业并在 kafka 流中添加流记录

第 3 步:- 用值覆盖暗淡数据(文件应该同名不要更改,维度文件夹应该只有一个文件) 注意:- 不要使用 spark 写入此文件夹,使用 Java 或 Scala filesystem.io 覆盖文件或 bash 删除文件并替换为同名的新数据文件。

第 4 步:- 在下一批中,spark 能够在加入 kafka 流时读取更新的维度数据......

示例代码:-

package com.broccoli.streaming.streamjoinupdate

import org.apache.log4j.Level, Logger
import org.apache.spark.sql.types.StringType, StructField, StructType, TimestampType
import org.apache.spark.sql.DataFrame, SparkSession

object BroadCastStreamJoin3 

  def main(args: Array[String]): Unit = 
    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")


    val dimDf3 = spark.read
      .schema(schemaUntyped2)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/dimension")
      .withColumnRenamed("id", "id_2")
      .withColumnRenamed("countrycode", "countrycode_2")

    import spark.implicits._

    factDf1
      .join(
        dimDf3,
        $"countrycode_2" <=> $"countrycode",
        "inner"
      )
      .writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination

  


谢谢 斯里

【讨论】:

以上是关于如何在火花流中刷新加载的数据帧内容?的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花结构化流式读取流中倒带 Kafka 偏移

如何在火花流中添加 2 行具有相同键(列值)的行?

如何更新火花流中的广播变量?

如何在火花中将数据帧转换为csv [重复]

如何在火花上将json字符串转换为数据帧

如何在火花中合并两个不同的数据帧? [复制]