Curator场景应用
Posted 小~虎
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator场景应用相关的知识,希望对你有一定的参考价值。
分布式锁功能:
在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一个点,需要进行同步操作,(java提供synchronized或者Reentrantlock实现),
使用curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,这里推荐使用Curator框架的
InterProcessMutex来实现。
1 package bjsxt.curator.lock; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.concurrent.CountDownLatch; 6 7 import org.apache.curator.RetryPolicy; 8 import org.apache.curator.framework.CuratorFramework; 9 import org.apache.curator.framework.CuratorFrameworkFactory; 10 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 11 import org.apache.curator.retry.ExponentialBackoffRetry; 12 13 public class Lock2 { 14 15 /** zookeeper地址 */ 16 static final String CONNECT_ADDR = "192.168.2.2:2181"; 17 /** session超时时间 */ 18 static final int SESSION_OUTTIME = 5000;// ms 19 20 static int count = 10; 21 22 public static void genarNo() { 23 try { 24 count--; 25 System.out.println(count); 26 } finally { 27 28 } 29 } 30 31 public static void main(String[] args) throws Exception { 32 33 // 1 重试策略:初试时间为1s 重试10次 34 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 35 // 2 通过工厂创建连接 36 CuratorFramework cf = CuratorFrameworkFactory.builder() 37 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 38 .retryPolicy(retryPolicy) 39 // .namespace("super") 40 .build(); 41 // 3 开启连接 42 cf.start(); 43 44 // 4 分布式锁 45 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); 46 // final ReentrantLock reentrantLock = new ReentrantLock(); 47 final CountDownLatch countdown = new CountDownLatch(1); 48 49 for (int i = 0; i < 10; i++) { 50 new Thread(new Runnable() { 51 @Override 52 public void run() { 53 try { 54 countdown.await(); 55 // 加锁 56 lock.acquire(); 57 // reentrantLock.lock(); 58 // -------------业务处理开始 59 // genarNo(); 60 SimpleDateFormat sdf = new SimpleDateFormat( 61 "HH:mm:ss|SSS"); 62 System.out.println(sdf.format(new Date())); 63 // System.out.println(System.currentTimeMillis()); 64 // -------------业务处理结束 65 } catch (Exception e) { 66 e.printStackTrace(); 67 } finally { 68 try { 69 // 释放 70 lock.release(); 71 // reentrantLock.unlock(); 72 } catch (Exception e) { 73 e.printStackTrace(); 74 } 75 } 76 } 77 }, "t" + i).start(); 78 } 79 Thread.sleep(100); 80 countdown.countDown(); 81 82 } 83 }
分布式计数器功能
分布式计数器,在单JVM中,我们可以通过AtomicInteger这种经典的方式实现,但是在分布式的场景下,就需要利用Curator框架的DistributedAtomicInteger来实现
1 package bjsxt.curator.atomicinteger; 2 3 import org.apache.curator.RetryPolicy; 4 import org.apache.curator.framework.CuratorFramework; 5 import org.apache.curator.framework.CuratorFrameworkFactory; 6 import org.apache.curator.framework.recipes.atomic.AtomicValue; 7 import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 import org.apache.curator.retry.RetryNTimes; 10 11 public class CuratorAtomicInteger { 12 13 /** zookeeper地址 */ 14 static final String CONNECT_ADDR = "192.168.2.2:2181"; 15 /** session超时时间 */ 16 static final int SESSION_OUTTIME = 5000;// ms 17 18 public static void main(String[] args) throws Exception { 19 20 // 1 重试策略:初试时间为1s 重试10次 21 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 22 // 2 通过工厂创建连接 23 CuratorFramework cf = CuratorFrameworkFactory.builder() 24 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 25 .retryPolicy(retryPolicy).build(); 26 // 3 开启连接 27 cf.start(); 28 // cf.delete().forPath("/super"); 29 30 // 4 使用DistributedAtomicInteger 31 DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger( 32 cf, "/super", new RetryNTimes(3, 1000)); 33 34 AtomicValue<Integer> value = atomicIntger.add(1); 35 System.out.println(value.succeeded()); 36 System.out.println(value.postValue()); // 最新值 37 System.out.println(value.preValue()); // 原始值 38 39 } 40 }
Curator框架,让一些很困难的问题,简单化了
以上是关于Curator场景应用的主要内容,如果未能解决你的问题,请参考以下文章