当您每天收到包含所有数据(新旧)的 XML 或 CSV 文件时,如何仅处理新数据

Posted

技术标签:

【中文标题】当您每天收到包含所有数据(新旧)的 XML 或 CSV 文件时,如何仅处理新数据【英文标题】:How to handle only new data when you receive XML or CSV file which contains all data (old & new) daily 【发布时间】:2020-05-11 04:54:03 【问题描述】:

我每天都会收到一个 XML 或 CSV 文件,其中包含所有数据(旧的和新的)。例如,如果 Yesterday.xml 包含 3 条记录,Today.xml 则包含 4 条记录(3 旧和 1 新)。

我只关心最后一个(新行),因为我只想处理新数据,旧数据每天都在处理。

使用 Spark 和 Kafka 实现这一点的最佳方法是什么? 数据示例:

OpportunityNo, OpprotunityTitle,Field
--- yesterday data----
Row1:1,OppTit1,IT
Row2:2,OppTit2,HEALTH
Row3:3,OppTit3,Finance
-------today data---------
Row4:4,OppTit4,Engineering

补充说明:

    该文件非常大。即处理文件的成本很高,尤其是它可能在 today.xml 中并更新与昨天的数据相关的第二行,但是由于它发生了更新,因此应该将其视为今天的更新。 我需要将新的(和更新的)数据写入数据库(作为接收器)。 所有这些的目的是向具有相同专业的人推荐新记录。即如果我的专业是IT,那么登录时,我会得到推荐的OppTit1,一旦我打开这个OppTitle1,我将被移动到历史推荐表中,就像这样,任何新数据一旦打开就会插入到推荐表中移到历史版本,用户可以回来查看。

如果除了处理 XML 之外别无他法,我想知道您对如何设计它的建议。

我的代码在下面,但是它将每个新文件的所有数据都放到目录中,我只想获取新数据而不是所有数据。

import org.apache.log4j._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types. StructType, StructField, IntegerType, DoubleType, StringType, TimestampType, DateType 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger

object Demo 
    def main(args: Array[String]) 
            Logger.getLogger("org").setLevel(Level.ERROR)

            val conf = new SparkConf()
            conf.set("spark.app.name", "GrantAnalytics")
            conf.set("spark.master", "local")
            val sc = new SparkContext(conf)

            val spark = SparkSession.builder().appName("GrantAnalytics").master("local[*]").getOrCreate()

            spark.conf.set("spark.sql.shuffle.partitions", 5)
            val schema = new StructType(Array(
              new StructField("OpportunityID", IntegerType, true),
              new StructField("OpportunityTitle", StringType, true),
              new StructField("OpportunityNumber", StringType, true),
              new StructField("CFDANumbers", DoubleType, true),
              new StructField("CategoryOfFundingActivity", StringType, true)))

            val streamingDF = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).option("header", "true").format("csv").load("C:/datasets/output/*.csv")

            val query = streamingDF.select(concat(col("OpportunityID"), lit("~"), col("OpportunityTitle"), lit("~"), col("OpportunityNumber"), lit("~"), col("CFDANumbers"), lit("~"), col("CategoryOfFundingActivity")).alias("value")).writeStream.format("kafka").outputMode(OutputMode.Update()).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "grants").option("checkpointLocation", "C:/deleteme/kafka/").start()
            query.awaitTermination()
  

【问题讨论】:

【参考方案1】:

如果您想使用 Kafka,最好在设计应用程序时考虑单独和独立的事件。

这意味着,您应该将 XML 拆分为行并通过数据/时间戳对其进行过滤,以将每条消息发送到 Kafka。在 Kafka 中,每条消息都包含示例数据中的一行。最后,在第二天之后,Kafka 应该包含以下四个消息:

1,OppTit1,IT
2,OppTit2,HEALTH
3,OppTit3,Finance
4,OppTit4,Engineering

另外,请确保为 Kafka 消息应用有用的密钥。

只要数据作为单个事件在 Kafka 中可用,您就可以使用 Spark 仅消费和处理引用当前日期的新消息,因为以前消费的消息不会再次被消费。

【讨论】:

给出的 xml 包含不包含日期和时间的行,如果我添加了该列,它将添加到所有行,否则,我将针对之前的 xml 处理这个 xml(今天与昨天相比),这是昂贵的。 Kafka 或 spark 是否可以确定该记录已被摄取(即 kafka 或 spark 可以确定该记录已被读取和处理,然后它将跳过它)。

以上是关于当您每天收到包含所有数据(新旧)的 XML 或 CSV 文件时,如何仅处理新数据的主要内容,如果未能解决你的问题,请参考以下文章

C#将多个XMLNode或XMLNodeList从一个XMLDocument复制到另一个XMLDocument

每天一个Linux命令(04):tcpdump命令

当您知道内容类型时如何获得正确的文件扩展名

一个diff工具,用于判断两个目录下所有的改动(比较新旧版本文件夹)

当您收到带有 onesignal 的推送通知时自动打开应用程序

从站点链接或包含ram文件的xml链接下载所有*.rm文件