Curator场景应用

Posted 小~虎

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator场景应用相关的知识,希望对你有一定的参考价值。

分布式锁功能:

在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一个点,需要进行同步操作,(java提供synchronized或者Reentrantlock实现),

使用curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,这里推荐使用Curator框架的

InterProcessMutex来实现。 

 1 package bjsxt.curator.lock;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.CountDownLatch;
 6 
 7 import org.apache.curator.RetryPolicy;
 8 import org.apache.curator.framework.CuratorFramework;
 9 import org.apache.curator.framework.CuratorFrameworkFactory;
10 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
11 import org.apache.curator.retry.ExponentialBackoffRetry;
12 
13 public class Lock2 {
14 
15     /** zookeeper地址 */
16     static final String CONNECT_ADDR = "192.168.2.2:2181";
17     /** session超时时间 */
18     static final int SESSION_OUTTIME = 5000;// ms
19 
20     static int count = 10;
21 
22     public static void genarNo() {
23         try {
24             count--;
25             System.out.println(count);
26         } finally {
27 
28         }
29     }
30 
31     public static void main(String[] args) throws Exception {
32 
33         // 1 重试策略:初试时间为1s 重试10次
34         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
35         // 2 通过工厂创建连接
36         CuratorFramework cf = CuratorFrameworkFactory.builder()
37                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
38                 .retryPolicy(retryPolicy)
39                 // .namespace("super")
40                 .build();
41         // 3 开启连接
42         cf.start();
43 
44         // 4 分布式锁
45         final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
46         // final ReentrantLock reentrantLock = new ReentrantLock();
47         final CountDownLatch countdown = new CountDownLatch(1);
48 
49         for (int i = 0; i < 10; i++) {
50             new Thread(new Runnable() {
51                 @Override
52                 public void run() {
53                     try {
54                         countdown.await();
55                         // 加锁
56                         lock.acquire();
57                         // reentrantLock.lock();
58                         // -------------业务处理开始
59                         // genarNo();
60                         SimpleDateFormat sdf = new SimpleDateFormat(
61                                 "HH:mm:ss|SSS");
62                         System.out.println(sdf.format(new Date()));
63                         // System.out.println(System.currentTimeMillis());
64                         // -------------业务处理结束
65                     } catch (Exception e) {
66                         e.printStackTrace();
67                     } finally {
68                         try {
69                             // 释放
70                             lock.release();
71                             // reentrantLock.unlock();
72                         } catch (Exception e) {
73                             e.printStackTrace();
74                         }
75                     }
76                 }
77             }, "t" + i).start();
78         }
79         Thread.sleep(100);
80         countdown.countDown();
81 
82     }
83 }

 

分布式计数器功能

分布式计数器,在单JVM中,我们可以通过AtomicInteger这种经典的方式实现,但是在分布式的场景下,就需要利用Curator框架的DistributedAtomicInteger来实现

 1 package bjsxt.curator.atomicinteger;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.framework.recipes.atomic.AtomicValue;
 7 import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
 8 import org.apache.curator.retry.ExponentialBackoffRetry;
 9 import org.apache.curator.retry.RetryNTimes;
10 
11 public class CuratorAtomicInteger {
12 
13     /** zookeeper地址 */
14     static final String CONNECT_ADDR = "192.168.2.2:2181";
15     /** session超时时间 */
16     static final int SESSION_OUTTIME = 5000;// ms
17 
18     public static void main(String[] args) throws Exception {
19 
20         // 1 重试策略:初试时间为1s 重试10次
21         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
22         // 2 通过工厂创建连接
23         CuratorFramework cf = CuratorFrameworkFactory.builder()
24                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
25                 .retryPolicy(retryPolicy).build();
26         // 3 开启连接
27         cf.start();
28         // cf.delete().forPath("/super");
29 
30         // 4 使用DistributedAtomicInteger
31         DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(
32                 cf, "/super", new RetryNTimes(3, 1000));
33 
34         AtomicValue<Integer> value = atomicIntger.add(1);
35         System.out.println(value.succeeded());
36         System.out.println(value.postValue()); // 最新值
37         System.out.println(value.preValue()); // 原始值
38 
39     }
40 }

Curator框架,让一些很困难的问题,简单化了

 

以上是关于Curator场景应用的主要内容,如果未能解决你的问题,请参考以下文章

Curator使用

curator

Zookeeper客户端之 Curator

Curator的使用

七curator recipes之阻塞队列SimpleDistributedQueue

Apache Curator 实现分布式锁