Spring Boot—20Zookeeper
Posted ParamousGIS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot—20Zookeeper相关的知识,希望对你有一定的参考价值。
pom.xml
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery-server</artifactId> <version>2.12.0</version> </dependency>
application.properties
# server.address=0.0.0.0 server.port=8080 server.servlet.context-path=/test server.session.timeout=300 server.error.path=/error # server.tomcat.accesslog.enabled=true server.tomcat.accesslog.buffered=true server.tomcat.accesslog.directory=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/logs # spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=Asia/Shanghai # spring.thymeleaf.cache=true spring.thymeleaf.enabled=true file.upload.path=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/fileUpLoad spring.servlet.multipart.enabled=true spring.servlet.multipart.file-size-threshold=0 spring.servlet.multipart.location=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/temp spring.servlet.multipart.max-file-size=10MB spring.servlet.multipart.max-request-size=10MB spring.servlet.multipart.resolve-lazily=false # # debug=true # Enable debug logs. # trace=true # Enable trace logs. # LOGGING logging.config=classpath:logback.xml # zookeeper zk.url=127.0.0.1:2181 zk.retry-time-init = 1000 zk.retry-count = 3
启动类
package com.smartmap.sample.test; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class TestZookeeperApplication { public static void main(String[] args) { SpringApplication.run(TestZookeeperApplication.class, args); } }
Zookeeper配置类
package com.smartmap.sample.test.conf; import java.util.ArrayList; import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceInstanceBuilder; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZookeeperConfiguration { Log log = LogFactory.getLog(ZookeeperConfiguration.class); @Value("${zk.url}") private String zkUrl; @Value("${zk.retry-time-init}") private int retryTimeInit; @Value("${zk.retry-count}") private int retryCount; @Bean public CuratorFramework getCuratorFramework() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(retryTimeInit, retryCount); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy); addListener(curatorFramework); curatorFramework.start(); registerService(curatorFramework); ServiceInstance<ServerPayload> service = findService(curatorFramework, "book"); log.info(service); return curatorFramework; } /** * 添加监听 * @param curatorFramework */ private void addListener(CuratorFramework curatorFramework) { curatorFramework.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { CuratorEventType type = curatorEvent.getType(); if(type==CuratorEventType.WATCHED) { WatchedEvent we = curatorEvent.getWatchedEvent(); EventType et = we.getType(); if(we.getPath() != null) { log.info(et + ":" + we.getPath());
// 重新注册监听 curatorFramework.checkExists().watched().forPath(we.getPath()); } } } }); } /** * 服务注册 * @param client * @throws Exception */ protected void registerService(CuratorFramework client) throws Exception { // 构造一个服务描述 ServiceInstanceBuilder<ServerPayload> serviceInstanceBuilder = ServiceInstance.builder(); serviceInstanceBuilder.address("127.0.0.1"); serviceInstanceBuilder.port(8080); serviceInstanceBuilder.name("book"); ServerPayload serverPayload = new ServerPayload(); serverPayload.setUrl("/api/v1.1/system/book"); serviceInstanceBuilder.payload(serverPayload); ServiceInstance<ServerPayload> serviceInstance = serviceInstanceBuilder.build(); // 服务注册 /*JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.registerService(serviceInstance); serviceDiscovery.start();*/ ServiceDiscovery<ServerPayload> serviceDiscovery = getServiceDiscovery(client); serviceDiscovery.registerService(serviceInstance); } /** * 服务发现 * @param client * @param serviceName * @return * @throws Exception */ protected ServiceInstance<ServerPayload> findService(CuratorFramework client, String serviceName) throws Exception { /*JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.start();*/ ServiceDiscovery<ServerPayload> serviceDiscovery = getServiceDiscovery(client); // 查找服务 Collection<ServiceInstance<ServerPayload>> all = serviceDiscovery.queryForInstances(serviceName); if(all.size() == 0) { return null; } else { // 取第一个服务 ServiceInstance<ServerPayload> service = new ArrayList<ServiceInstance<ServerPayload>>(all).get(0); log.info(service.getAddress()); log.info(service.getPayload()); return service; } } public ServiceDiscovery<ServerPayload> getServiceDiscovery(CuratorFramework client) throws Exception { JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer<ServerPayload>(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.start(); return serviceDiscovery; } public static class ServerPayload { private String url; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } } }
CuratorFrameWork调用Zookeeper
package com.smartmap.sample.test.service.impl; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.smartmap.sample.test.service.OrderService; @Service public class OrderServiceImpl implements OrderService { final Log log = LogFactory.getLog(OrderServiceImpl.class); final String lockPath = "/lock/order"; @Autowired CuratorFramework curatorFramework; /** * 创建节点 */ public String createPath(String path) throws Exception { curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path, new byte[0]); return "create " + path; } /** * 删除节点 */ public String delete(String path) throws Exception { curatorFramework.delete().forPath(path); return "delete " + path; } /** * 获取节点数据 */ public String getData(String path) throws Exception { byte[] bs = curatorFramework.getData().forPath(path); String result = new String(bs); return "get data: " + result; } /** * 设置节点数据 */ public String setData(String path, String data) throws Exception { curatorFramework.setData().forPath(path, data.getBytes()); return "set data " + data; } /** * 检查节点是否存在 */ public String check(String path) throws Exception { Stat stat = curatorFramework.checkExists().forPath(path); return "stat " + stat; } /** * 获取子节点 */ public String children(String path) throws Exception { List<String> children = curatorFramework.getChildren().forPath(path); return "children " + children; } /** * 监控节点 */ public String watch(String path) throws Exception { Stat stat = curatorFramework.checkExists().watched().forPath(path); return "watch: < " + path + " > stat: " + stat; } /** * 分布式锁 */ public void makeOrderType(String type) { String path = lockPath + "/" + type; log.info("try do job for " + type); try { InterProcessMutex lock = new InterProcessMutex(curatorFramework, path); if (lock.acquire(10, TimeUnit.HOURS)) { try { Thread.sleep(1000 * 5); log.info("do job " + type + " done"); } finally { lock.release(); } } } catch (Exception ex) { ex.printStackTrace(); } } }
Controller类
package com.smartmap.sample.test.controller.rest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.smartmap.sample.test.service.OrderService; @RestController @RequestMapping("/api/v1.1/system/book") public class ZookeeperTestCrontroller { @Autowired OrderService orderService; /** * 创建节点 * * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/create?path=/testNode‘ * @param path * @return * @throws Exception */ @PostMapping("/create") public String create(@RequestParam("path") String path) throws Exception { return orderService.createPath(path); } /** * 删除节点 * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/delete?path=/testNode‘ * @param path * @return * @throws Exception */ @PostMapping("/delete") public String delete(@RequestParam("path") String path) throws Exception { return orderService.delete(path); } /** * 获取节点数据 * curl -XGET ‘http://127.0.0.1:8080/test/api/v1.1/system/book/data?path=/testNode‘ * @param path * @return * @throws Exception */ @GetMapping("/data") public String getData(@RequestParam("path") String path) throws Exception { return orderService.getData(path); } /** * 设置节点数据 * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/data?path=/testNode&data=987654321‘ * @param path * @param data * @return * @throws Exception */ @PostMapping("/data") public String setData(@RequestParam("path") String path, @RequestParam("data") String data) throws Exception { return orderService.setData(path, data); } /** * 检查节点是否存在 * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/check?path=/testNode‘ * @param path * @return * @throws Exception */ @PostMapping("/check") public String check(@RequestParam("path") String path) throws Exception { return orderService.check(path); } /** * 获取子节点 * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/children?path=/testNode‘ * @param path * @return * @throws Exception */ @PostMapping("/children") public String getChildren(@RequestParam("path") String path) throws Exception { return orderService.children(path); } /** * 监控节点 * * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/watch?path=/testNode‘ * @param path * @return * @throws Exception */ @PostMapping("/watch") public String watch(@RequestParam("path") String path) throws Exception { return orderService.watch(path); } /** * 订单--分布式锁 * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/order?type=orderType‘ * @param type * @return * @throws Exception */ @PostMapping("/order") public String makeOrder(@RequestParam("type") String type) throws Exception { orderService.makeOrderType(type); return "success"; } }
以上是关于Spring Boot—20Zookeeper的主要内容,如果未能解决你的问题,请参考以下文章