CuratorBarrier
Posted 小~虎
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CuratorBarrier相关的知识,希望对你有一定的参考价值。
一、DistributedDoubleBarrier
同时开始,同时结束
1 package bjsxt.curator.barrier; 2 3 import java.util.Random; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFrameworkFactory; 8 import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier; 9 import org.apache.curator.retry.ExponentialBackoffRetry; 10 11 public class CuratorBarrier1 { 12 13 /** zookeeper地址 */ 14 static final String CONNECT_ADDR = "192.168.2.2:2181"; 15 /** session超时时间 */ 16 static final int SESSION_OUTTIME = 5000;// ms 17 18 public static void main(String[] args) throws Exception { 19 20 for (int i = 0; i < 5; i++) { 21 new Thread(new Runnable() { 22 @Override 23 public void run() { 24 try { 25 RetryPolicy retryPolicy = new ExponentialBackoffRetry( 26 1000, 10); 27 CuratorFramework cf = CuratorFrameworkFactory.builder() 28 .connectString(CONNECT_ADDR) 29 .retryPolicy(retryPolicy).build(); 30 cf.start(); 31 32 DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( 33 cf, "/super", 5); 34 Thread.sleep(1000 * (new Random()).nextInt(3)); 35 System.out.println(Thread.currentThread().getName() 36 + "已经准备"); 37 barrier.enter(); 38 System.out.println("同时开始运行..."); 39 Thread.sleep(1000 * (new Random()).nextInt(3)); 40 System.out.println(Thread.currentThread().getName() 41 + "运行完毕"); 42 barrier.leave(); 43 System.out.println("同时退出运行..."); 44 45 } catch (Exception e) { 46 e.printStackTrace(); 47 } 48 } 49 }, "t" + i).start(); 50 } 51 52 } 53 }
二、DistributedBarrier
吹哨,其他线程完成等待,等待障碍移除,同时运行。
package bjsxt.curator.barrier; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorBarrier2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.2.2:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;// ms static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy).build(); cf.start(); barrier = new DistributedBarrier(cf, "/super"); System.out.println(Thread.currentThread().getName() + "设置barrier!"); barrier.setBarrier(); // 设置 barrier.waitOnBarrier(); // 等待 System.out.println("---------开始执行程序----------"); } catch (Exception e) { e.printStackTrace(); } } }, "t" + i).start(); } Thread.sleep(5000); barrier.removeBarrier(); // 释放 } }
以上是关于CuratorBarrier的主要内容,如果未能解决你的问题,请参考以下文章