Spring Boot—20Zookeeper

Posted ParamousGIS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot—20Zookeeper相关的知识,希望对你有一定的参考价值。

pom.xml

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.11</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery-server</artifactId>
    <version>2.12.0</version>
</dependency>

 

application.properties

#
server.address=0.0.0.0
server.port=8080
server.servlet.context-path=/test
server.session.timeout=300
server.error.path=/error
#
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.buffered=true
server.tomcat.accesslog.directory=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/logs
#
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=Asia/Shanghai
#
spring.thymeleaf.cache=true
spring.thymeleaf.enabled=true

file.upload.path=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/fileUpLoad

spring.servlet.multipart.enabled=true
spring.servlet.multipart.file-size-threshold=0
spring.servlet.multipart.location=D:/Project/JavaWeb/SpringBoot/04JPASpringBoot/temp
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.servlet.multipart.resolve-lazily=false

#
# debug=true # Enable debug logs.
# trace=true # Enable trace logs.

# LOGGING
logging.config=classpath:logback.xml

# zookeeper
zk.url=127.0.0.1:2181
zk.retry-time-init = 1000
zk.retry-count = 3

 

启动类

package com.smartmap.sample.test;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestZookeeperApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestZookeeperApplication.class, args);

    }
}

 

 

Zookeeper配置类

package com.smartmap.sample.test.conf;

import java.util.ArrayList;
import java.util.Collection;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceInstanceBuilder;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConfiguration {
    Log log = LogFactory.getLog(ZookeeperConfiguration.class);
    
    @Value("${zk.url}")
    private String zkUrl;
    
    @Value("${zk.retry-time-init}")
    private int retryTimeInit;
    
    @Value("${zk.retry-count}")
    private int retryCount;
    
    @Bean
    public CuratorFramework getCuratorFramework() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(retryTimeInit, retryCount);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
        addListener(curatorFramework);
        curatorFramework.start();
        registerService(curatorFramework);
        ServiceInstance<ServerPayload> service = findService(curatorFramework, "book");
        log.info(service);
        return curatorFramework;        
    }
    
    /**
     * 添加监听
     * @param curatorFramework
     */
    private void addListener(CuratorFramework curatorFramework) {
        curatorFramework.getCuratorListenable().addListener(new CuratorListener() {

            @Override
            public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                CuratorEventType type = curatorEvent.getType();
                if(type==CuratorEventType.WATCHED) {
                    WatchedEvent we = curatorEvent.getWatchedEvent();
                    EventType et = we.getType();
                    if(we.getPath() != null) {
                        log.info(et + ":" + we.getPath());
// 重新注册监听 curatorFramework.checkExists().watched().forPath(we.getPath()); } } } }); }
/** * 服务注册 * @param client * @throws Exception */ protected void registerService(CuratorFramework client) throws Exception { // 构造一个服务描述 ServiceInstanceBuilder<ServerPayload> serviceInstanceBuilder = ServiceInstance.builder(); serviceInstanceBuilder.address("127.0.0.1"); serviceInstanceBuilder.port(8080); serviceInstanceBuilder.name("book"); ServerPayload serverPayload = new ServerPayload(); serverPayload.setUrl("/api/v1.1/system/book"); serviceInstanceBuilder.payload(serverPayload); ServiceInstance<ServerPayload> serviceInstance = serviceInstanceBuilder.build(); // 服务注册 /*JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.registerService(serviceInstance); serviceDiscovery.start();*/ ServiceDiscovery<ServerPayload> serviceDiscovery = getServiceDiscovery(client); serviceDiscovery.registerService(serviceInstance); } /** * 服务发现 * @param client * @param serviceName * @return * @throws Exception */ protected ServiceInstance<ServerPayload> findService(CuratorFramework client, String serviceName) throws Exception { /*JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.start();*/ ServiceDiscovery<ServerPayload> serviceDiscovery = getServiceDiscovery(client); // 查找服务 Collection<ServiceInstance<ServerPayload>> all = serviceDiscovery.queryForInstances(serviceName); if(all.size() == 0) { return null; } else { // 取第一个服务 ServiceInstance<ServerPayload> service = new ArrayList<ServiceInstance<ServerPayload>>(all).get(0); log.info(service.getAddress()); log.info(service.getPayload()); return service; } } public ServiceDiscovery<ServerPayload> getServiceDiscovery(CuratorFramework client) throws Exception { JsonInstanceSerializer<ServerPayload> jsonInstanceSerializer = new JsonInstanceSerializer<ServerPayload>(ServerPayload.class); ServiceDiscovery<ServerPayload> serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class).client(client) .serializer(jsonInstanceSerializer).basePath("/service").build(); serviceDiscovery.start(); return serviceDiscovery; } public static class ServerPayload { private String url; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } } }

 

CuratorFrameWork调用Zookeeper

package com.smartmap.sample.test.service.impl;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.smartmap.sample.test.service.OrderService;

@Service
public class OrderServiceImpl implements OrderService {

    final Log log = LogFactory.getLog(OrderServiceImpl.class);
    final String lockPath = "/lock/order";

    @Autowired
    CuratorFramework curatorFramework;

    /**
     * 创建节点
     */
    public String createPath(String path) throws Exception {
        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path, new byte[0]);
        return "create " + path;
    }

    /**
     * 删除节点
     */
    public String delete(String path) throws Exception {
        curatorFramework.delete().forPath(path);
        return "delete " + path;
    }

    /**
     * 获取节点数据
     */
    public String getData(String path) throws Exception {
        byte[] bs = curatorFramework.getData().forPath(path);
        String result = new String(bs);
        return "get data: " + result;
    }

    /**
     * 设置节点数据
     */
    public String setData(String path, String data) throws Exception {
        curatorFramework.setData().forPath(path, data.getBytes());
        return "set data " + data;
    }

    /**
     * 检查节点是否存在
     */
    public String check(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        return "stat " + stat;
    }

    /**
     * 获取子节点
     */
    public String children(String path) throws Exception {
        List<String> children = curatorFramework.getChildren().forPath(path);
        return "children " + children;
    }

    /**
     * 监控节点
     */
    public String watch(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().watched().forPath(path);
        return "watch: < " + path + " >    stat: " + stat;
    }

    /**
     * 分布式锁
     */
    public void makeOrderType(String type) {
        String path = lockPath + "/" + type;
        log.info("try do job for " + type);
        try {
            InterProcessMutex lock = new InterProcessMutex(curatorFramework, path);
            if (lock.acquire(10, TimeUnit.HOURS)) {
                try {
                    Thread.sleep(1000 * 5);
                    log.info("do job " + type + " done");
                } finally {
                    lock.release();
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

 

Controller类

package com.smartmap.sample.test.controller.rest;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.smartmap.sample.test.service.OrderService;

@RestController
@RequestMapping("/api/v1.1/system/book")
public class ZookeeperTestCrontroller {
    
    @Autowired
    OrderService orderService;
    
    /**
     * 创建节点
     * 
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/create?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/create")
    public String create(@RequestParam("path") String path) throws Exception {
        return orderService.createPath(path);
    }
    
    /**
     * 删除节点
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/delete?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/delete")
    public String delete(@RequestParam("path") String path) throws Exception {
        return orderService.delete(path);
    }
    
    /**
     * 获取节点数据
     * curl -XGET ‘http://127.0.0.1:8080/test/api/v1.1/system/book/data?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @GetMapping("/data")
    public String getData(@RequestParam("path") String path) throws Exception {
        return orderService.getData(path);
    }
    
    /**
     * 设置节点数据
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/data?path=/testNode&data=987654321‘
     * @param path
     * @param data
     * @return
     * @throws Exception
     */
    @PostMapping("/data")
    public String setData(@RequestParam("path") String path, @RequestParam("data") String data) throws Exception {
        return orderService.setData(path, data);
    }
    
    /**
     * 检查节点是否存在
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/check?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/check")
    public String check(@RequestParam("path") String path) throws Exception {
        return orderService.check(path);
    }
    
    /**
     * 获取子节点
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/children?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/children")
    public String getChildren(@RequestParam("path") String path) throws Exception {
        return orderService.children(path);
    }
    
    /**
     * 监控节点
     * 
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/watch?path=/testNode‘
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/watch")
    public String watch(@RequestParam("path") String path) throws Exception {
        return orderService.watch(path);
    }
        
    /**
     * 订单--分布式锁
     * curl -XPOST ‘http://127.0.0.1:8080/test/api/v1.1/system/book/order?type=orderType‘
     * @param type
     * @return
     * @throws Exception
     */
    @PostMapping("/order")
    public String makeOrder(@RequestParam("type") String type) throws Exception {
         orderService.makeOrderType(type);
         return "success";
    }
    
}

以上是关于Spring Boot—20Zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot引入Lombok

Spring Boot—15SpringJPA

Spring Boot—14JdbcTemplate

Spring Boot—19Cache

Spring Boot—21Actuator--监控

Spring Boot—11控制器Controller