每个微批次火花流中处理的总记录
Posted
技术标签:
【中文标题】每个微批次火花流中处理的总记录【英文标题】:Total records processed in each micro batch spark streaming 【发布时间】:2021-06-28 13:27:05 【问题描述】:有没有办法可以找到每个微批次处理到下游增量表中的记录数。我有流式作业,它使用 trigger.once() 和 附加模式 每小时运行一次。出于审计目的,我想知道每个微批次处理了多少条记录。我试过下面的代码来打印处理的记录数(显示在第二行)。
ss_count=0
def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:batchId, rows in passed dataframe: micro_batch_df.count()")
ss_count = micro_batch_df.count()
saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"saveloc/_checkpoint").start(saveloc)
print(ss_count)
流式作业将毫无问题地运行,但 micro_batch_df.count() 不会打印任何计数。
任何指针将不胜感激。
【问题讨论】:
打印的打印语句是否没有计数值或整个打印语句丢失? 缺少完整的打印语句@FelixKJose 您是否尝试了函数“write_to_managed_table”之外的 println?我相信当它在函数内部时,它将被打印在工作节点而不是驱动节点上。 @puligun,对上面的代码进行了相应的编辑,得到函数外的计数,微批处理后ss_count的输出显示为0。 @chaitrak 如果以下答案有帮助,请告诉我 【参考方案1】:这是您正在寻找的工作示例(structured_steaming_example.py):
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")
def batch_write(output_df, batch_id):
print("inside foreachBatch for batch_id:0, rows in passed dataframe: 1".format(batch_id, output_df.count()))
save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
.foreachBatch(batch_write)
.option('checkpointLocation', save_loc + "/_checkpoint")
.start(save_loc)
)
query.awaitTermination()
附上示例镶木地板文件。请将其放入数据文件夹并使用 spark-submit 执行代码
spark-submit --master local structured_steaming_example.py
请将任何样例 parquet 文件放在 data 文件夹下进行测试。
【讨论】:
非常感谢@felixkjose,我已经接受了答案。以上是关于每个微批次火花流中处理的总记录的主要内容,如果未能解决你的问题,请参考以下文章