带有自定义接收器的 Spark 结构化流中的输入行数

Posted

技术标签:

【中文标题】带有自定义接收器的 Spark 结构化流中的输入行数【英文标题】:Number of input rows in spark structured streaming with custom sink 【发布时间】:2018-01-26 16:59:00 【问题描述】:

我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零。

我的流构建:

StreamingQuery writeStream = session
            .readStream()
            .schema(RecordSchema.fromClass(TestRecord.class))
            .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
            .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
            .csv(s3Path.toString())
            .as(Encoders.bean(TestRecord.class))
            .flatMap(
                ((FlatMapFunction<TestRecord, TestOutputRecord>) (u) -> 
                    List<TestOutputRecord> list = new ArrayList<>();
                    try 
                        TestOutputRecord result = transformer.convert(u);
                        list.add(result);
                     catch (Throwable t) 
                        System.err.println("Failed to convert a record");
                        t.printStackTrace();
                    

                    return list.iterator();
                ),
                Encoders.bean(TestOutputRecord.class))
        .map(new DataReinforcementMapFunction<>(), Encoders.bean(TestOutputRecord.clazz))
        .writeStream()
        .trigger(Trigger.ProcessingTime(WRITE_FREQUENCY, TimeUnit.SECONDS))
        .format(MY_WRITER_FORMAT)
        .outputMode(OutputMode.Append())
        .queryName("custom-sink-stream")
        .start();

        writeStream.processAllAvailable();
        writeStream.stop();

日志:

Streaming query made progress: 
  "id" : "a8a7fbc2-0f06-4197-a99a-114abae24964",
  "runId" : "bebc8a0c-d3b2-4fd6-8710-78223a88edc7",
  "name" : "custom-sink-stream",
  "timestamp" : "2018-01-25T18:39:52.949Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : 
    "getOffset" : 781,
    "triggerExecution" : 781
  ,
  "stateOperators" : [ ],
  "sources" : [ 
    "description" : "FileStreamSource[s3n://test-bucket/test]",
    "startOffset" : 
      "logOffset" : 0
    ,
    "endOffset" : 
      "logOffset" : 0
    ,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
   ],
  "sink" : 
    "description" : "com.mycompany.spark.MySink@f82a99"
  

我是否必须在自定义接收器中填充任何指标才能跟踪进度?还是从 s3 存储桶读取时 FileStreamSource 出现问题?

【问题讨论】:

我也面临着类似的问题。标准报告会出现这个问题吗? “标准报告”是什么意思?如果你在结构化流中实现自定义接收器,那么你不应该通过 queryExecution 使用 rdd 而不是直接来自数据集 ***.com/questions/52483576/… 请看这个 【参考方案1】:

问题与在我的自定义接收器中使用 dataset.rdd 有关,该接收器创建了一个新计划,因此 StreamExecution 不知道它,因此无法获取指标。

data.rdd.mapPartitions 替换为data.queryExecution.toRdd.mapPartitions 可以解决此问题。

【讨论】:

以上是关于带有自定义接收器的 Spark 结构化流中的输入行数的主要内容,如果未能解决你的问题,请参考以下文章

使用 ForeachWriter 在 Spark 流中实现 Cassandra 接收器

Spark:火花流中的接收器是瓶颈吗?

SPARK 结构化流中的 StructField 是不是存在错误

Spark 结构化流中的外部连接

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?