模拟实现配置中心配置发生变化时秒级推送至客户端代码思路

Posted jun1019

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了模拟实现配置中心配置发生变化时秒级推送至客户端代码思路相关的知识,希望对你有一定的参考价值。

import com.alibaba.fastjson.JSON;
import com.xuebusi.spring.study.http.BasicHttpUtil;
import com.xuebusi.spring.study.model.ConfData;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 模拟实现配置中心配置发生变化时秒级推送至客户端
 * 操作步骤:
 * 1.请求http://host:port/conf/add?key=&value=接口添加数据
 * 2.请求http://host:port/conf/get?key=接口查看数据
 * 3.请求http://host:port/conf/set?key=&value=接口修改数据
 * 4.观察控制台日志会看到配置变化的通知
 * <p>
 * 原理:
 * 1.项目启动时,模拟一个客户端调用 /conf/monitor 接口请求数据,创建DeferredResult对象,
 * 以客户端ID为key,以DeferredResult对象为value放入 deferredResultMap;
 * 2.当客户端调用修改接口时,就将修改后的数据放入 changedDataMap(这里用一个map存,实际应该存数据库);
 * 3.通过另一个线程定时(间隔1s)查看 changedDataMap 中变化的数据;
 * 4.如果数据有变化就遍历deferredResultMap通过 deferredResult.setResult() 通知给所有的客户端;
 *
 * @author syj
 */
@RestController
@RequestMapping
public class ConfController implements InitializingBean {

    private static final int BEAT_TIME_OUT = 30;

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Map<String, ConfData> changedDataMap = new ConcurrentHashMap<>();

    private Map<String, DeferredResult> deferredResultMap = new ConcurrentHashMap();

    // 模拟数据库存储配置
    public static Map<String, ConfData> confDataMap = new ConcurrentHashMap<>();

    /**
     * 获取最新配置
     * 使用 DeferredResult 返回异步结果
     *
     * @param clientId
     * @return
     */
    @GetMapping("/conf/monitor")
    public DeferredResult<String> monitor(String clientId) {
        DeferredResult<String> result = new DeferredResult<>(BEAT_TIME_OUT * 1000L, "30秒超时啦");
        deferredResultMap.put(clientId, result);
        return result;
    }

    /**
     * 查询所有配置
     *
     * @return
     */
    @GetMapping("/conf/list")
    public Result<Map<String, ConfData>> list() {
        return new Result<>(confDataMap);
    }

    /**
     * 查询配置
     *
     * @param key
     * @return
     */
    @GetMapping("/conf/get")
    public Result<ConfData> get(String key) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<ConfData>(Result.FAIL_CODE, "key不存在");
        } else {
            return new Result<ConfData>(confData);
        }
    }

    /**
     * 修改配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/set")
    public Result<String> set(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<String>(Result.FAIL_CODE, "key不存在");
        } else {
            if (!confData.getValue().equals(value)) {
                confData.setValue(value);
                confDataMap.put(key, confData);
                // 同时放到 changedDataMap 中一份最新数据
                changedDataMap.put(key, confData);
            }
            return Result.SUCCESS;
        }
    }

    /**
     * 新增配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/add")
    public Result<String> add(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData != null) {
            return new Result<String>(Result.FAIL_CODE, "key已经存在了");
        }
        confDataMap.put(key, new ConfData(key, value));
        return Result.SUCCESS;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 配置中心启动线程监控配置变化
        executorService.execute(() -> {
            while (true) {
                Collection<ConfData> values = changedDataMap.values();
                if (values != null && values.size() > 0) {
                    for (String key : changedDataMap.keySet()) {
                        for (String clientId : deferredResultMap.keySet()) {
                            DeferredResult deferredResult = deferredResultMap.get(clientId);
                            String msg = "通知客户端" + clientId + "配置" + key + "发生变化:" + JSON.toJSONString(changedDataMap.get(key));
                            deferredResult.setResult(msg);

                            // 移除已经通知过的客户端
                            deferredResultMap.remove(clientId);

                            // 移除已经通知过的数据
                            changedDataMap.remove(key);
                        }
                    }
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模拟客户端 http请求最新配置
        executorService.execute(() -> {
            String clientId = UUID.randomUUID().toString().replace("-", "");
            while (true) {
                String url = "http://localhost:8888/conf/monitor?clientId=" + clientId;
                String result = BasicHttpUtil.get(url, BEAT_TIME_OUT + 30);
                System.out.println(Thread.currentThread().getName() + "----http调用结果:" + result);
            }
        });
    }

    /**
     * 返回结果
     *
     * @param <T>
     */
    public static class Result<T> {

        public static int SUCCESS_CODE = 200;
        public static int FAIL_CODE = 500;
        public static final Result<String> SUCCESS = new Result<>(SUCCESS_CODE, "操作成功");
        public static final Result<String> FAIL = new Result<>(FAIL_CODE, "操作失败");

        private int code;
        private String msg;
        private T data;

        public Result(T data) {
            this.data = data;
        }

        public Result(int code, String msg) {
            this.code = code;
            this.msg = msg;
        }

        public Result(int code, String msg, T data) {
            this.code = code;
            this.msg = msg;
            this.data = data;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public String getMsg() {
            return msg;
        }

        public void setMsg(String msg) {
            this.msg = msg;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }
    }
}

 

以上是关于模拟实现配置中心配置发生变化时秒级推送至客户端代码思路的主要内容,如果未能解决你的问题,请参考以下文章

配置中心Config:环境搭建

使用Nacos存储Sentinel的限流规则

Aliware推出应用配置管理大杀器,分布式架构下配置推送秒级生效

springboot项目接入配置中心,实现@ConfigurationProperties的bean属性刷新方案

Apollo配置中心 原理分析

3配置中心