sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?相关的知识,希望对你有一定的参考价值。
参考技术A checkpoint干的事情:将缓冲池中的脏页刷新回磁盘,不同之处在于每次从哪里取多少脏页刷新到磁盘,以及什么时候触发checkpoint。
checkpoint解决的问题:
1.缩短数据库的恢复时间(数据库宕机时,不需要重做所有的日志,因checkpoint之前的页都已经刷新回磁盘啦)
2.缓冲池不够用时,将脏页刷新到磁盘(缓冲池不够用时,根据LRU算会溢出最近最少使用的页,若此页为脏页,需要强制执行checkpoint将脏也刷回磁盘)
3.重做日志不可用时,刷新脏页(采用循环使用的,并不是无限增大。当重用时,此时的重做日志还需要使用,就必须强制执行checkpoint将脏页刷回磁盘)
066 基于checkpoint的HA机制实现
1.说明
针对需要恢复的应用场景,提供了HA的的机制
内部实现原理:基于checkpoint的
当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复
2.注意点
SparkStreaming 的HA和updateStateByKey来记录历史数据的API不能一起使用
二:程序
1.程序
1 package com.stream.it 2 3 import kafka.serializer.StringDecoder 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.kafka.KafkaUtils 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object HAKafkaWordcount { 10 def main(args: Array[String]): Unit = { 11 val conf=new SparkConf() 12 .setAppName("spark-streaming-wordcount") 13 .setMaster("local[*]") 14 val sc=SparkContext.getOrCreate(conf) 15 val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02" 16 17 18 /** 19 * 构造StreamingContext对象 20 * 21 * @return 22 */ 23 def createStreamingContextFunc(): StreamingContext = { 24 val ssc = new StreamingContext(sc, Seconds(5)) 25 ssc.checkpoint(checkpointDir) 26 val kafkaParams=Map("group.id"->"stream-sparking-0", 27 "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka", 28 "auto.offset.reset"->"smallest" 29 ) 30 val topics=Map("beifeng"->1) 31 val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 32 ssc, //给定sparkStreaming的上下文 33 kafkaParams, //kafka的参数信息,通过kafka HightLevelComsumerApi连接 34 topics, //给定读取对应的topic的名称以及读取数据的线程数量 35 StorageLevel.MEMORY_AND_DISK_2 //数据接收器接收到kafka的数据后的保存级别 36 ).map(_._2) 37 38 39 val resultWordcount=dStream 40 .filter(line=>line.nonEmpty) 41 .flatMap(line=>line.split(" ").map((_,1))) 42 .reduceByKey(_+_) 43 resultWordcount.foreachRDD(rdd=>{ 44 rdd.foreachPartition(iter=>iter.foreach(println)) 45 }) 46 ssc 47 } 48 49 val ssc = StreamingContext.getOrCreate( 50 checkpointPath = checkpointDir, 51 creatingFunc = createStreamingContextFunc 52 ) 53 54 //启动 55 ssc.start() 56 //等到 57 ssc.awaitTermination() 58 } 59 }
2.注意点
HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题
以上是关于sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?的主要内容,如果未能解决你的问题,请参考以下文章