微服务架构下的数据一致性

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务架构下的数据一致性相关的知识,希望对你有一定的参考价值。

参考技术A 微服务架构的流行源于它能够带来更快的变化响应能力,比如独立部署,每个服务的能力职责是独立的,可以按需独立发布;再比如每个服务可以由不同的开发团队负责,每个服务的技术栈也可以不同,可以选择更快捷合理的方式实现不同的服务。

然而,微服务架构作为分布式架构,躲不开的一个问题就是数据一致性的问题,特别是在技术异构和数据源类型不同的情况下,传统的分布式事务(2PC或3PC)也很难解决微服务架构下的一致性问题。

在微服务架构下,多个服务之间通常会定义明确上下游关系,下游系统可以依赖上游系统,下游系统可以通过API查询或修改上游系统的数据;反过来则不然,上游系统不应该知道下游系统的存在,也就是说上游系统不能依赖下游系统,上游系统的变化只能通过异步事件的方式发出,下游系统监听事件并基于事件做对应的数据状态变化。

在基于上面原则的微服务架构下(见上面图示,本文不考虑服务间循环依赖的场景),在上下游服务间的数据通信(图示中的每个箭头表示一种数据通信)一旦发生问题,都会产生数据不一致的场景,下面我们逐一说明:

举个例子,订单服务是下游服务,库存服务是上游服务,在订单确认时要锁定库存,实现上订单服务在状态变化同时通过同步API修改库存的状态,为了保证数据一致性,在调用库存服务API异常后订单服务会回滚当前的数据状态变更。

在这个场景下,同一个业务流程,需要同时修改两个服务的数据,在以下两种情况下会发生数据不一致的问题:

上游服务每个关键状态变更都可能触发下游服务的一些逻辑链,因此上游服务发布的事件对于下游服务是非常重要的,但这些事件并不影响上游服务自身逻辑,也不影响自身数据状态的变化,因此通常不会设计成阻碍业务流程,那么在事件服务或事件载体(通常是消息队列)与上游服务之间的通信异常,就会导致上游服务的事件发布失败。

这种场景下,上游服务的业务流程已经成功,不可能有再次触发事件的场景,这个事件就丢失了,下游服务因为没有收到上游服务的事件,数据没有做对应的变化而导致数据一致性问题。

同样,下游服务在消费事件时也很有可能因为一些原因,导致事件的消费失败,这些原因可能包括:

上游服务并不关心下游的消费者,所以对于发布出去的事件,上游系统也不关心下游服务是否消费成功,更不会有因某个下游服务消费失败而重发事件的逻辑,这同样会导致类似于场景二的数据一致性问题。

根据CAP理论,分区容错性、可用性和一致性里面必须要牺牲掉一个,而在实际实现过程中,分区容错性和可用性是很难舍弃的,所以通常会舍弃一致性,取而代之会用最终一致性保证数据在可容忍的时长内达到最终一致。

微服务架构也不例外,在服务内部,可以通过本地事务保证数据的强一致性;而当业务发生在多个服务中,我们追求最终一致性。那么都有哪些措施可以保证跨服务的最终一致性呢?

这是个业务问题,在微服务的架构下,每个服务都是独立的,如果有一个业务功能需要同时修改两个服务的数据,往往这个业务可以拆分成两个步骤,比如场景一种提到的订单和库存的例子,如果我们可以先锁定库存,然后再确认订单看上去这个问题就迎刃而解了。

因此在业务中发现一个功能需要同时修改两个服务的数据,我们首先可以来讨论这个业务设计是否合理;如果业务上很多场景都要求两个服务的数据保持强一致,那可能我们需要看看微服务的划分是否合理。

为了解决场景二和场景三的不一致性问题,需要上游服务和下游服务的共同努力:
上游服务需要尽可能将事件发送出去,比如:先同步发送,如果失败改为异步重试,重试多次仍然失败可以先持久化,通过定时任务来重发或者人工干预重发。
下游服务也要尽可能的把事件处理掉,收到事件后可以考虑先将事件持久化,消费成功后标记事件,如果消费失败可以通过定时任务重试消费。

当我们提到重试,就不得不考虑幂等性的问题,这里的幂等性包括以下两个场景:

即便我们做了很多我们认为万全的准备,在分布式系统的执行链路上,每个节点都有可能失败,加上业务的复杂度,数据不一致的情景也很难彻底解决,而对于那些小概率发生但技术解决起来成本昂贵的问题,我们可以尝试通过对业务的深刻理解设计一些后台的数据维护功能,保证在核心业务数据异常时,可以在一定的规则内进行修复,从而保证业务的顺利进行

数据一致性问题首先是个业务问题,其次才是个技术问题。在微服务架构下,我们期望每个服务职责单一,这种职责单一体现的是业务价值,如果微服务的拆分过小而导致业务难以实现,那这种拆分是不合理的,业务专家们非常有必要了解系统,从业务侧给出服务拆分的建议。
在数据一致性问题上,我们首先要思考业务设计的合理性,其次是当前架构设计的合理性,然后在一定的约束下,通过最终一致性保证业务价值,除非迫不得已,不建议引入分布式事务框架,一方面成本较高,另一方面也会引入性能等新的问题。

微服务架构下的session一致性

本文由宜信-高级架构师-梁鑫投稿,之前在社区分享过两篇文章,分别介绍了一下在公司项目中搭建springcloud框架的经验和我们自己研发的几个微服务组件。在这个过程中,我们还需要解决微服务架构中特别需要注意的一个问题————session一致性。在此,抱着学习的态度把我的解决方案跟大家再次分享一下。

一.背景.绕不开的session一致性

采用微服务架构以后,把原先单一的节点拆解成了多个微服务节点。在采用微服务架构之前,我们的项目普遍采用的都是分布式集群架构,多数的公司项目都采用IP哈希的方式进行session的跟踪,这样做非常简单,只需要在nginx简单配置即可,但我们采用springcloud微服务架构之后,session一致性保持就成了我们必须要解决的问题。

二.Session一致性的常见方式

简单说一下session和session一致性。服务为访问他的用户构造了一组信息,称之为会话(session),当该用户在限定时间内每次发起http访问时,服务端能自动感知到是该用户在发起访问,称之为会话保持(session一致性)

2.1、IP哈希

2.2、Session复制

把每个用户的session都同步复制到集群中的每一个服务节点,这样无论用户访问哪个服务节点,都能获取到自己的session信息。

2.3、Session客户端存储

把session信息保存到客户端的cookie中。

2.4、Session分布式存储

把session信息保存到后端的其它存储中,例如mysql,redis,memcached等。

三.微服务架构下的session一致性

3.1 原理图

3.2 基于shiro的session

  • 我们采用了shiro作为权限控制组件;



 
   
   
 
  1. ...

  2. @Bean

  3. public SessionManager sessionManager() {

  4.    DefaultWebSessionManager sessionManager = new DefaultWebSessionManager();

  5.    sessionManager.setCacheManager(redisCacheManager());

  6.    sessionManager.setGlobalSessionTimeout(60000);

  7.    sessionManager.setDeleteInvalidSessions(true);

  8.    sessionManager.setSessionValidationSchedulerEnabled(true);

  9.    sessionManager.setDeleteInvalidSessions(true);

  10.    sessionManager.setSessionIdCookie(simpleCookie());

  11.    sessionManager.setSessionDAO(sessionDAO);

  12.    return sessionManager;

  13. }

  14. @Bean

  15. public DefaultWebSecurityManager securityManager() {

  16.    DefaultWebSecurityManager securityManager = new DefaultWebSecurityManager();

  17.    securityManager.setSessionManager(sessionManager());

  18.    securityManager.setCacheManager(redisCacheManager());

  19.    securityManager.setRealm(getShiroRealm());

  20.    return securityManager;

  21. }

  22. @Bean("shiroFilter")

  23. public ShiroFilterFactoryBean shiroFilterFactoryBean() {

  24.    ShiroFilterFactoryBean shiroFilter = new ShiroFilterFactoryBean();

  25.    Map<String, String> chains = new LinkedHashMap<String, String>();

  26.    chains.put("/autoconfig", "anon");

  27.    chains.put("/beans", "anon");

  28.    chains.put("/configprops", "anon");

  29.    chains.put("/dump", "anon");

  30.    chains.put("/env", "anon");

  31.    chains.put("/health", "anon");

  32.    chains.put("/info", "anon");

  33.    chains.put("/metrics", "anon");

  34.    chains.put("/mappings", "anon");

  35.    chains.put("/shutdown", "anon");

  36.    chains.put("/trace", "anon");

  37.    shiroFilter.setFilterChainDefinitionMap(chains);

  38.    return shiroFilter;

  39. }

  40. ...

  • 构造ShiroUser作为session的主要保存信息,用户的ID,账号,名称等;

 
   
   
 
  1. public class ShiroUser implements Serializable {

  2.    private static final long serialVersionUID = -4661753370573516137L;

  3.    private Integer id;          // 主键ID

  4.    private String username;     // 账号

  5.    private String name;         // 姓名

  6.    public Integer getId() {

  7.        return id;

  8.    }

  9.    public void setId(Integer id) {

  10.        this.id = id;

  11.    }

  12.    public String getUsername() {

  13.        return username;

  14.    }

  15.    public void setUsername(String username) {

  16.        this.username = username;

  17.    }

  18.    public String getName() {

  19.        return name;

  20.    }

  21.    public void setName(String name) {

  22.        this.name = name;

  23.    }

  24. }

  • 集成公司ldap;

 
   
   
 
  1. /**

  2. * 建立连接

  3. */

  4. public void connect() {

  5.    Hashtable<String, String> env = new Hashtable<String, String>();

  6.    env.put(Context.INITIAL_CONTEXT_FACTORY, FACTORY);

  7.    env.put(Context.PROVIDER_URL, URL + BASEDN);

  8.    env.put(Context.SECURITY_AUTHENTICATION, "simple");

  9.    env.put(Context.SECURITY_PRINCIPAL, ldapUserName); // 管理员

  10.    env.put(Context.SECURITY_CREDENTIALS, ldapPassword); // 管理员密码

  11.    try {

  12.        ctx = new InitialLdapContext(env, connCtls);

  13.        logger.info("连接成功");

  14.    } catch (AuthenticationException e) {

  15.        logger.error("连接失败", e);

  16.    }

  17. }

  18. /**

  19. * 用户验证

  20. */

  21. public boolean validate(String userName, String password) {

  22.    try {

  23.        ctx.addToEnvironment(Context.SECURITY_PRINCIPAL, userName);

  24.        ctx.addToEnvironment(Context.SECURITY_CREDENTIALS, password);

  25.        ctx.reconnect(null);

  26.        return true;

  27.    } catch (AuthenticationException e) {

  28.        logger.error("连接失败", e);

  29.    }

  30.    return false;

  31. }

3.3 采取redis存储session

  • 加入redis存储,构造RedisCacheManager;

 
   
   
 
  1. public class RedisCacheManager implements CacheManager {

  2.    @Resource

  3.    private RedisTemplate<String, Object> redisTemplate;

  4.    @Override

  5.    public <K, V> Cache<K, V> getCache(String name) throws CacheException {

  6.        return new GantryCache<K, V>(name, redisTemplate);

  7.    }

  8.    ...

  9. }

  • 构造RedisSessionDAO,通过dao对象最终对redis进行操作,需要包含redisTemplate对象;

 
   
   
 
  1. @Component

  2. public class RedisSessionDAO extends EnterpriseCacheSessionDAO {

  3.    private static int expireTime = 1800;

  4.    private static String prefix = "gantry-session:";

  5.    @Resource

  6.    private RedisTemplate<String, Object> redisTemplate;

  7.    /**

  8.     * 创建session,保存到数据库

  9.     */

  10.    @Override

  11.    protected Serializable create(Session session) {

  12.        redisTemplate.opsForValue().set(prefix + sessionId.toString(), session);

  13.        return sessionId;

  14.    }

  15.    /**

  16.     * 获取session

  17.     */

  18.    @Override

  19.    protected Session query(Serializable sessionId) {

  20.        if (session == null) {

  21.            session = (Session) redisTemplate.opsForValue().get(prefix + sessionId.toString());

  22.        }

  23.        return session;

  24.    }

  25.    /**

  26.     * 更新session的最后一次访问时间

  27.     */

  28.    @Override

  29.    protected void uppdate(Session session) {

  30.        String key = prefix + session.getId().toString();

  31.        if (!redisTemplate.hasKey(key)) {

  32.            redisTemplate.opsForValue().set(key, session);

  33.        }

  34.        redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);

  35.    }

  36.    /**

  37.     * 删除session

  38.     */

  39.    @Override

  40.    protected void delete(Session session) {

  41.        redisTemplate.delete(prefix + session.getId().toString());

  42.    }

  43. }

  • Shiro的SessionManager对象支持注入RedisSessionDAO对象,从而使用redis存储session;

 
   
   
 
  1. ...

  2. @Resource

  3. private RedisSessionDAO sessionDAO;

  4. @Bean

  5. public SessionManager sessionManager() {

  6.    DefaultWebSessionManager sessionManager = new DefaultWebSessionManager();

  7.    ...

  8.    sessionManager.setSessionDAO(sessionDAO);

  9.    return sessionManager;

  10. }

3.4 构建cookie

  • 在服务端创建cookie,注意cookie的httpOnly属性,只有当httpOnly属性设置为false时,才能通过javascript获取cookie值;

 
   
   
 
  1. ...

  2. @Bean

  3. public SimpleCookie simpleCookie() {

  4.    SimpleCookie simpleCookie = new SimpleCookie("sid");

  5.    simpleCookie.setHttpOnly(false);

  6.    simpleCookie.setMaxAge(-1);

  7.    simpleCookie.setName("GANTRYSESSIONID");

  8.    return simpleCookie;

  9. }

  10. ...

  • 在javascript中获取cookie;

 
   
   
 
  1. ...

  2. function getCookie(cookieName) {

  3.    var strCookie = document.cookie;

  4.    var arrCookie = strCookie.split("; ");

  5.    for (var i = 0; i < arrCookie.length; i++) {

  6.        var arr = arrCookie[i].split("=");

  7.        if (cookieName == arr[0]) {

  8.            return arr[1];

  9.        }

  10.    }

  11.    return "";

  12. }

  13. ...

3.5 微服务节点间sessionid的传递

  • 在cookie中获取sessionid,在url中拼接jsessionid;

 
   
   
 
  1. ...

  2. function openNewService(url) {

  3.    var id = getCookie("GANTRYSESSIONID")

  4.    window.open(url + ";jsessionid=" + id);

  5. }

  6. ...

3.6 在微服务节点B接收sessionid并在redis中获取session

  • 获取并保存sessionid;

 
   
   
 
  1. ...

  2. @RequestMapping("/")

  3. public String index(HttpServletRequest request) {

  4.    String sessionId = request.getRequestedSessionId();

  5.    request.getSession().setAttribute("SHIROSESSIONID", sessionId);

  6.    return "index.html";

  7. }

  8. ...

  • 通过sessionid从redis中获取session;

 
   
   
 
  1. ...

  2. @Autowired

  3. private RedisCacheManager redisCacheManager;

  4. @RequestMapping(value = "/someMethod", produces = "application/json;charset=UTF-8")

  5. @ResponseBody

  6. public String someMethod(HttpServletRequest request) throws IOException {

  7.    String SHIROSESSIONID = (String) request.getSession().getAttribute("SHIROSESSIONID");

  8.    String sessionId = request.getRequestedSessionId();

  9.    if (redisCacheManager == null) {

  10.        ...

  11.    }

  12.    Object shiroCacheObject = redisCacheManager.getCache("*");

  13.    if (shiroCacheObject == null) {

  14.        ...

  15.    }

  16.    ShiroCache shiroCache = (ShiroCache) shiroCacheObject;

  17.    Object sessionObject = shiroCache.get(SHIROSESSIONID);

  18.    if (sessionObject == null) {

  19.        ...

  20.    }

  21.    Session session = (Session) sessionObject;

  22.    ShiroUser user = (ShiroUser) session.getAttribute("user");

  23.    ...

  24. }

  25. ...


以上是关于微服务架构下的数据一致性的主要内容,如果未能解决你的问题,请参考以下文章

如何保障微服务架构下的数据一致性

从 0 开始的微服务架构:如何保障微服务架构下的数据一致性

Re:从 0 开始的微服务架构--如何保障微服务架构下的数据一致性--转

(转)如何保障微服务架构下的数据一致性?

Spring Cloud微服务系统下的数据一致性探讨

分布式事务微服务架构下的分布式事务问题