如何仅从文件中处理新记录?

Posted

技术标签:

【中文标题】如何仅从文件中处理新记录?【英文标题】:How to process new records only from file? 【发布时间】:2017-07-13 16:36:07 【问题描述】:

我有一个场景来处理文件中的记录。文件中的数据会定期添加(每毫秒)。所以我需要读取文件并处理它,同时只处理新添加的记录。

我遇到了基于 Spark SQL 构建的 Spark 结构化流的概念。我正在做的是-

    每 1 秒触发一次文件流处理 对文件运行 Spark SQL 查询 以追加模式将查询的输出写入控制台。

下面是相同的代码-

public static class SparkStreamer implements Runnable,Serializable 
    @Override
    public void run() 
        processDataStream();

    

    private void processDataStream() 

        Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");

        Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() 

            @Override
            public Iterator<String> call(String row) throws Exception 
                return Arrays.asList(row.split("\\|")).iterator();
            


        ,Encoders.STRING());

        Dataset<Row> dataCount = data.select(new Column("value"));


        StreamingQuery query = dataCount.writeStream()
                  .outputMode("append")
                  .format("console")
                  .start();
        try 
            query.awaitTermination();
         catch (StreamingQueryException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        
    

通过上述实现,查询执行了 1 次,但是如果我在文件中添加新记录,则不会触发第二批执行。

其他观察:

输出模式已完成并已更新,没有输出。只有在附加模式下,我才能得到 1 次输出。

有人可以帮助解决这个问题吗? Spark Structured Streaming 是否支持处理来自文件的数据,因为普通的 Spark Streaming 不支持。

【问题讨论】:

【参考方案1】:

Spark Structured Streaming 是否支持处理来自文件的数据

是的。

查询执行了 1 次,但是如果我在文件中添加新记录,则不会触发第二批执行。

一旦文件被标记为已被处理且不再被处理(查看负责它的FileStreamSource 以了解其在幕后如何工作),这将无法正常工作。

推荐的解决方案是将新内容写入新文件。

【讨论】:

以上是关于如何仅从文件中处理新记录?的主要内容,如果未能解决你的问题,请参考以下文章

如何保护仅从前端使用的 API(Ajax 调用)

如何仅从 URL 中获取搜索词,忽略多个分隔符

仅从表中选择最新记录并使其快速,如何?

如何使用 LINQ 仅从实体加载最后一条记录?

如何确保仅从特定表中选择第一条记录,该表可以在 DB2 中包含多个相同 ID 的记录

如何获取数据列表,特定日期仅从每个日期获取 6 条记录而不是更多