结构化流式处理:由于检查点数据而重新启动时出现流式处理异常
Posted
技术标签:
【中文标题】结构化流式处理:由于检查点数据而重新启动时出现流式处理异常【英文标题】:Structured Streaming: Streaming Exception on restarts due to checkpoint data 【发布时间】:2017-06-07 17:57:01 【问题描述】:我有一个读取流来使用来自 Kafka 主题的数据,并且基于每个传入消息中的属性值,我必须将数据写入 S3 中的 2 个不同位置中的任何一个(如果 value1 写入 location1,否则位置2)。 在下面的高层次上是我这样做的,
Dataset<Row> kafkaStreamSet = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrap)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", offsetsPerTrigger)
.load();
//raw message to ClickStream
Dataset<ClickStream> ds1 = kafkaStreamSet.mapPartitions(processClickStreamMessages, Encoders.bean(ClickStream.class));
ClickStream.java 中有 2 个子对象,一次只会填充其中一个,具体取决于消息属性值是 value1 还是 value2,
1) BookingRequest.java 如果值为 1, 2) PropertyPageView.java if value2 ,
然后我将其从点击流中分离出来,以写入 S3 中的 2 个差异位置,
//fetch BookingRequests in the ClickStream
Dataset<BookingRequest> ds2 = ds1.map(filterBookingRequests,Encoders.bean(BookingRequest.class));
//fetch PropertyPageViews in the ClickStream
Dataset<PropertyPageView> ds3 = ds1.map(filterPropertyPageViews,Encoders.bean(PropertyPageView.class));
最后 ds2 和 ds3 被写入 2 个不同的位置,
StreamingQuery bookingRequestsParquetStreamWriter = ds2.writeStream().outputMode("append")
.format("parquet")
.trigger(ProcessingTime.create(bookingRequestProcessingTime, TimeUnit.MILLISECONDS))
.option("checkpointLocation", "s3://" + s3Bucket+ "/checkpoint/bookingRequests")
.partitionBy("eventDate")
.start("s3://" + s3Bucket+ "/" + bookingRequestPath);
StreamingQuery PageViewsParquetStreamWriter = ds3.writeStream().outputMode("append")
.format("parquet")
.trigger(ProcessingTime.create(pageViewProcessingTime, TimeUnit.MILLISECONDS))
.option("checkpointLocation", "s3://" + s3Bucket+ "/checkpoint/PageViews")
.partitionBy("eventDate")
.start("s3://" + s3Bucket+ "/" + pageViewPath);
bookingRequestsParquetStreamWriter.awaitTermination();
PageViewsParquetStreamWriter.awaitTermination();
它似乎工作正常,并且在部署应用程序时,我看到数据写入了不同的路径。但是,每当作业在失败或手动停止和启动时重新启动时,它都会失败并出现以下异常(其中 userSessionEventJoin.global 是我的主题名称),
原因:org.apache.spark.sql.streaming.StreamingQueryException:预期例如"topicA":"0":23,"1":-1,"topicB":"0":-2,得到 "userSessionEventJoin.global":"92":154362528," 101 org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74) org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59)
如果我删除了所有的检查点信息,那么它会再次开始并在给定的 2 个位置开始新的检查点,但这意味着我必须再次从最新的偏移量开始处理并丢失所有以前的偏移量。 spark版本是2.1,这个topic有100+个partition。 我只用一个写入流(一个检查点位置)进行了测试,重启时会发生同样的异常。
请提出任何解决方案,谢谢。
【问题讨论】:
【参考方案1】:您的代码似乎是一个简单的点击流作业。在您的示例中,您创建了一个火花流会话并使用检查点目录间歇性地存储检查点数据。
但是,您的代码不知道如何从检查点补水。
到本段末尾会很清楚。
这是生产级流式作业的步骤。
1) Use getOrCreate API to create your spark streaming session.
a) getOrCreate takes two parameters. A function "(....) => sparkSession" and the checkpoint directory.
2) When program starts for the first time, it uses the checkpoint directory to store it's inner details. (Among other uses)
3) When program crashes/"stops and restarted", the spark Streaming session is created from the checkpoint hence giving you what you want.
由于在堆栈溢出中不鼓励引用链接,我将把示例代码放在下面。但本质上取自Spark Github example
/**
* Counts words in text encoded with UTF8 received from the network every second. This example also
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
* they can be registered on driver failures.
*
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
* <output-file> file to which the word counts will be appended
*
* <checkpoint-directory> and <output-file> must be absolute paths
*
* To run this on your local machine, you need to first run a Netcat server
*
* `$ nc -lk 9999`
*
* and run the example as
*
* `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
* localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
* the checkpoint data.
*
* Refer to the online documentation for more details.
*/
public final class JavaRecoverableNetworkWordCount
private static final Pattern SPACE = Pattern.compile(" ");
private static JavaStreamingContext createContext(String ip,
int port,
String checkpointDirectory,
String outputPath)
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
File outputFile = new File(outputPath);
if (outputFile.exists())
outputFile.delete();
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(checkpointDirectory);
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD((rdd, time) ->
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist =
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter =
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount ->
if (blacklist.value().contains(wordCount._1()))
droppedWordsCounter.add(wordCount._2());
return false;
else
return true;
).collect().toString();
String output = "Counts at time " + time + " " + counts;
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
);
return ssc;
public static void main(String[] args) throws Exception
if (args.length != 4)
System.err.println("You arguments were " + Arrays.asList(args));
System.err.println(
"Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
" <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
" Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
" HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
" the word counts will be appended\n" +
"\n" +
"In local mode, <master> should be 'local[n]' with n > 1\n" +
"Both <checkpoint-directory> and <output-file> must be absolute paths");
System.exit(1);
String ip = args[0];
int port = Integer.parseInt(args[1]);
String checkpointDirectory = args[2];
String outputPath = args[3];
// Function to create JavaStreamingContext without any output operations
// (used to detect the new context)
Function0<JavaStreamingContext> createContextFunc =
() -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
ssc.awaitTermination();
【讨论】:
【参考方案2】:这是 spark 版本 2.1.0 中的一个错误,使用该版本运行时似乎已在 spark 2.1.1 中修复。
【讨论】:
这是特定于新的 spark 结构化流,而不是 spark 流 (dtsreams)。 该错误与处理具有大量分区的主题时长检查点被截断的位置有关,在我的 120 中。以上是关于结构化流式处理:由于检查点数据而重新启动时出现流式处理异常的主要内容,如果未能解决你的问题,请参考以下文章
尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题