每个微批次火花流中处理的总记录

Posted

技术标签:

【中文标题】每个微批次火花流中处理的总记录【英文标题】:Total records processed in each micro batch spark streaming 【发布时间】:2021-06-28 13:27:05 【问题描述】:

有没有办法可以找到每个微批次处理到下游增量表中的记录数。我有流式作业,它使用 trigger.once()附加模式 每小时运行一次。出于审计目的,我想知道每个微批次处理了多少条记录。我试过下面的代码来打印处理的记录数(显示在第二行)。

ss_count=0 

def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:batchId, rows in passed dataframe: micro_batch_df.count()")

ss_count = micro_batch_df.count()

saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"saveloc/_checkpoint").start(saveloc)

print(ss_count)

流式作业将毫无问题地运行,但 micro_batch_df.count() 不会打印任何计数。

任何指针将不胜感激。

【问题讨论】:

打印的打印语句是否没有计数值或整个打印语句丢失? 缺少完整的打印语句@FelixKJose 您是否尝试了函数“write_to_managed_table”之外的 println?我相信当它在函数内部时,它将被打印在工作节点而不是驱动节点上。 @puligun,对上面的代码进行了相应的编辑,得到函数外的计数,微批处理后ss_count的输出显示为0。 @chaitrak 如果以下答案有帮助,请告诉我 【参考方案1】:

这是您正在寻找的工作示例(structured_steaming_example.py):

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StructuredStreamTesting") \
    .getOrCreate()

# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")


def batch_write(output_df, batch_id):
    print("inside foreachBatch for batch_id:0, rows in passed dataframe: 1".format(batch_id, output_df.count()))


save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
         .foreachBatch(batch_write)
         .option('checkpointLocation', save_loc + "/_checkpoint")
         .start(save_loc)
         )
query.awaitTermination()

附上示例镶木地板文件。请将其放入数据文件夹并使用 spark-submit 执行代码

spark-submit --master local structured_steaming_example.py

请将任何样例 parquet 文件放在 data 文件夹下进行测试。

【讨论】:

非常感谢@felixkjose,我已经接受了答案。

以上是关于每个微批次火花流中处理的总记录的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming:微批处理并行执行

火花。将 RDD 拆分为批次

即使有 0 条消息,火花流中的转换也需要更多时间

有没有办法在火花流中展平嵌套的 JSON?

使用 Apache Beam 按键处理事件的总排序

如何在火花流中刷新加载的数据帧内容?