带有自定义接收器的 Spark 结构化流中的输入行数
Posted
技术标签:
【中文标题】带有自定义接收器的 Spark 结构化流中的输入行数【英文标题】:Number of input rows in spark structured streaming with custom sink 【发布时间】:2018-01-26 16:59:00 【问题描述】:我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零。
我的流构建:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(s3Path.toString())
.as(Encoders.bean(TestRecord.class))
.flatMap(
((FlatMapFunction<TestRecord, TestOutputRecord>) (u) ->
List<TestOutputRecord> list = new ArrayList<>();
try
TestOutputRecord result = transformer.convert(u);
list.add(result);
catch (Throwable t)
System.err.println("Failed to convert a record");
t.printStackTrace();
return list.iterator();
),
Encoders.bean(TestOutputRecord.class))
.map(new DataReinforcementMapFunction<>(), Encoders.bean(TestOutputRecord.clazz))
.writeStream()
.trigger(Trigger.ProcessingTime(WRITE_FREQUENCY, TimeUnit.SECONDS))
.format(MY_WRITER_FORMAT)
.outputMode(OutputMode.Append())
.queryName("custom-sink-stream")
.start();
writeStream.processAllAvailable();
writeStream.stop();
日志:
Streaming query made progress:
"id" : "a8a7fbc2-0f06-4197-a99a-114abae24964",
"runId" : "bebc8a0c-d3b2-4fd6-8710-78223a88edc7",
"name" : "custom-sink-stream",
"timestamp" : "2018-01-25T18:39:52.949Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" :
"getOffset" : 781,
"triggerExecution" : 781
,
"stateOperators" : [ ],
"sources" : [
"description" : "FileStreamSource[s3n://test-bucket/test]",
"startOffset" :
"logOffset" : 0
,
"endOffset" :
"logOffset" : 0
,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
],
"sink" :
"description" : "com.mycompany.spark.MySink@f82a99"
我是否必须在自定义接收器中填充任何指标才能跟踪进度?还是从 s3 存储桶读取时 FileStreamSource 出现问题?
【问题讨论】:
我也面临着类似的问题。标准报告会出现这个问题吗? “标准报告”是什么意思?如果你在结构化流中实现自定义接收器,那么你不应该通过 queryExecution 使用 rdd 而不是直接来自数据集 ***.com/questions/52483576/… 请看这个 【参考方案1】:问题与在我的自定义接收器中使用 dataset.rdd
有关,该接收器创建了一个新计划,因此 StreamExecution 不知道它,因此无法获取指标。
将data.rdd.mapPartitions
替换为data.queryExecution.toRdd.mapPartitions
可以解决此问题。
【讨论】:
以上是关于带有自定义接收器的 Spark 结构化流中的输入行数的主要内容,如果未能解决你的问题,请参考以下文章
使用 ForeachWriter 在 Spark 流中实现 Cassandra 接收器
SPARK 结构化流中的 StructField 是不是存在错误