失败后读取 Spark Streaming 检查点
Posted
技术标签:
【中文标题】失败后读取 Spark Streaming 检查点【英文标题】:Spark Streaming checkpoints reading after failure 【发布时间】:2017-12-18 13:49:49 【问题描述】:我正在尝试使用包含容错功能的 Kafka 应用程序实现 Spark Streaming。当我重新启动应用程序时,它会读取重新启动之前已经阅读的消息,并且我的计算出错了。请帮我解决这个问题。
这是用Java编写的代码。
public static JavaStreamingContext createContextFunc()
SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints();
ApplicationConf conf = new ApplicationConf();
String checkpointDir = conf.getCheckpointDirectory();
JavaStreamingContext streamingContext = app.getStreamingContext(checkpointDir);
JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext);
return streamingContext;
public static void main(String[] args) throws InterruptedException
String checkpointDir = conf.getCheckpointDirectory();
Function0<JavaStreamingContext> createContextFunc = () -> createContextFunc();
JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc);
streamingContext.start();
streamingContext.awaitTermination();
public JavaStreamingContext getStreamingContext(String checkpointDir)
ApplicationConf conf = new ApplicationConf();
String appName = conf.getAppName();
String master = conf.getMaster();
int duration = conf.getDuration();
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));
streamingContext.checkpoint(checkpointDir);
return streamingContext;
public SparkSession getSession()
ApplicationConf conf = new ApplicationConf();
String appName = conf.getAppName();
String hiveConf = conf.getHiveConf();
String thriftConf = conf.getThriftConf();
int shufflePartitions = conf.getShuffle();
SparkSession spark = SparkSession
.builder()
.appName(appName)
.config("spark.sql.warehouse.dir", hiveConf)
.config("hive.metastore.uris", thriftConf)
.enableHiveSupport()
.getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions);
return spark;
public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext)
KafkaConfig kafkaConfig = new KafkaConfig();
Set<String> topicsSet = kafkaConfig.getTopicSet();
Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams();
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<String> logdata = messages.map(ConsumerRecord::value);
return logdata;
这里是 github 项目的链接。 https://github.com/ThisaST/Spark-Fault-Tolerance
【问题讨论】:
【参考方案1】:我通过在代码中添加以下配置解决了这个问题。
sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true")
【讨论】:
以上是关于失败后读取 Spark Streaming 检查点的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming:读取和写入状态信息到外部数据库,如 cassandra
在 Spark Streaming 作业中获取 broadcast_1 的 broadcast_1_piece0 失败
Spark Structured Streaming - 此查询不支持从检查点位置恢复
spark streaming 读取kafka两种方式的区别