sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?相关的知识,希望对你有一定的参考价值。

参考技术A checkpoint干的事情:

将缓冲池中的脏页刷新回磁盘,不同之处在于每次从哪里取多少脏页刷新到磁盘,以及什么时候触发checkpoint。

checkpoint解决的问题:

1.缩短数据库的恢复时间(数据库宕机时,不需要重做所有的日志,因checkpoint之前的页都已经刷新回磁盘啦)

2.缓冲池不够用时,将脏页刷新到磁盘(缓冲池不够用时,根据LRU算会溢出最近最少使用的页,若此页为脏页,需要强制执行checkpoint将脏也刷回磁盘)

3.重做日志不可用时,刷新脏页(采用循环使用的,并不是无限增大。当重用时,此时的重做日志还需要使用,就必须强制执行checkpoint将脏页刷回磁盘)

066 基于checkpoint的HA机制实现

1.说明

  针对需要恢复的应用场景,提供了HA的的机制

  内部实现原理:基于checkpoint的

  当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

 

2.注意点

  SparkStreaming 的HA和updateStateByKey来记录历史数据的API不能一起使用

 

二:程序

1.程序

 1 package com.stream.it
 2 
 3 import kafka.serializer.StringDecoder
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.kafka.KafkaUtils
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 
 9 object HAKafkaWordcount {
10   def main(args: Array[String]): Unit = {
11     val conf=new SparkConf()
12         .setAppName("spark-streaming-wordcount")
13           .setMaster("local[*]")
14     val sc=SparkContext.getOrCreate(conf)
15     val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02"
16 
17 
18     /**
19       * 构造StreamingContext对象
20       *
21       * @return
22       */
23     def createStreamingContextFunc(): StreamingContext = {
24       val ssc = new StreamingContext(sc, Seconds(5))
25       ssc.checkpoint(checkpointDir)
26       val kafkaParams=Map("group.id"->"stream-sparking-0",
27         "zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
28         "auto.offset.reset"->"smallest"
29       )
30       val topics=Map("beifeng"->1)
31       val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
32         ssc,             //给定sparkStreaming的上下文
33         kafkaParams,     //kafka的参数信息,通过kafka HightLevelComsumerApi连接
34         topics,          //给定读取对应的topic的名称以及读取数据的线程数量
35         StorageLevel.MEMORY_AND_DISK_2     //数据接收器接收到kafka的数据后的保存级别
36       ).map(_._2)
37 
38 
39       val resultWordcount=dStream
40         .filter(line=>line.nonEmpty)
41         .flatMap(line=>line.split(" ").map((_,1)))
42         .reduceByKey(_+_)
43       resultWordcount.foreachRDD(rdd=>{
44         rdd.foreachPartition(iter=>iter.foreach(println))
45       })
46       ssc
47     }
48 
49     val ssc = StreamingContext.getOrCreate(
50       checkpointPath = checkpointDir,
51       creatingFunc = createStreamingContextFunc
52     )
53 
54     //启动
55     ssc.start()
56     //等到
57     ssc.awaitTermination()
58   }
59 }

 

2.注意点

  HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

以上是关于sybase中checkpoint干吗用的,如果在mysql中checkpoint表示啥?的主要内容,如果未能解决你的问题,请参考以下文章

MFC中DrawItem()函数是干吗用的?

数据库中job干吗用的?

sybase 如何像SQL SERVER一样建立作业,让他能定时运行存储过程

linux下如何启动和停止sybase的服务

COCOMO模型可以用来干吗?

intelij idea连接sybase数据库,数据插入问题