Go语言基于Etcd实现的定时任务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言基于Etcd实现的定时任务相关的知识,希望对你有一定的参考价值。

参考技术A 利用 Etcd 的Lease租约特性来实现定时功能,同时通过Watch机制来实现多节点情况下只有一个节点执行该任务。通过定时任务库 Cron 的时间字符串解析器Parser来解析任务执行时间。

Etcd
Cron
源码链接

基于ETCD实现分布式锁&实战:控制多个应用仅一台执行任务

  我们知道,分布式锁有好几种方案:基于Redis、基于数据库如MySQL、基于注册中心如Zookeeper等;而 K8S 体系中基于 Go 语言编写的的 ETCD 则对于分布式锁有着更强大的支持。
  ETCD 有一个租约机制,客户端跟 ETCD 服务端订立一个“租约”后,需要在租约到期之前进行续约,否则会在到期后被自动解除租约,而租约可以挂载多个 key-value,当租约过期时,挂载在上面的 key-value 也会跟着被删除。既有类似 Redis / Zookeeper 的 key-value 机制能够实现分布式锁,同时租约机制,又能实现某个客户端宕机后,服务端自动检测锁超时并自动释放锁。
  储备阅读:
基于 Etcd 的分布式锁实现原理及方案

基于 etcd 实现分布式锁

还有比Redis更骚的分布式锁的实现方式吗?有,etcd!

  这里,使用 ETCD 封装好的 lockClient 来实现分布式锁,例子如下:
maven依赖:

io.etcd:jetcd-core:jar:0.5.3:compile
io.grpc:grpc-core:jar:1.31.1:compile

# 有用到dubbo的话直接引入一个依赖即可:
org.apache.dubbo:dubbo-remoting-etcd3:jar:2.7.11:compile

DistributedLock.java

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.grpc.stub.StreamObserver;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;

/**
 * 基于 ETCD 的分布式锁
 *
 * @author lvlang
 * @date 2022/2/24
 */
public class DistributedLock 

    private static DistributedLock lockProvider = null;
    private static final Object MUTEX = new Object();
    private Client client;
    private Lock lockClient;
    private Lease leaseClient;

    private DistributedLock() 
        super();
        // 创建Etcd客户端,本例中Etcd集群只有一个节点
        this.client = Client.builder().endpoints("http://localhost:2379").build();
        this.lockClient = client.getLockClient();
        this.leaseClient = client.getLeaseClient();
    

    public static DistributedLock getInstance() 
        synchronized (MUTEX) 
            if (null == lockProvider) 
                lockProvider = new DistributedLock();
            
        
        return lockProvider;
    

    /**
     * 加锁操作
     *
     * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
     * @param ttl:      Time To Live(单位秒),租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
     * @return LockResult
     */
    public LockResult getLock(String lockName, long ttl) 
        LockResult lockResult = new LockResult();
        /*1.准备阶段*/
        // 初始化返回值lockResult
        lockResult.setIsLockSuccess(false);

        // 记录租约ID,初始值设为 0L
        long leaseId = 0;

        /*2.创建租约并设置自动续约*/
        // 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定。
        try 
            leaseId = leaseClient.grant(ttl).get().getID();
            lockResult.setLeaseId(leaseId);
         catch (InterruptedException | ExecutionException e) 
            return lockResult;
        

        StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>() 
            @Override
            public void onNext(LeaseKeepAliveResponse arg0) 
                // 自动续约心跳
            

            @Override
            public void onError(Throwable arg0) 
            

            @Override
            public void onCompleted() 
            
        ;

        // 设置自动续约
        leaseClient.keepAlive(leaseId, observer);

        /*3.加锁操作*/
        // 执行加锁操作,并为锁对应的Key绑定租约
        try 
            // 阻塞式获取锁
            lockClient.lock(ByteSequence.from(lockName, StandardCharsets.UTF_8), leaseId).get();
         catch (InterruptedException | ExecutionException e1) 
            // 解除租约
            leaseClient.revoke(leaseId);
            return lockResult;
        

        lockResult.setIsLockSuccess(true);
        return lockResult;
    
	public static class LockResult 
		// 在下面补充
	

  租约这块,可以参考基于 Etcd 的分布式锁实现原理及方案 的路线一那样,自定义一个ScheduledExecutorService定时任务服务,定期对租约进行续期。而 ETCD 的 API 封装也支持通过观察器实现自动续约:Lease.keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer);
  创建好租约并设置了自动续约后,那后面就是把 key-value 绑定到租约上,上面例子中lockClient.lock()做的事情,其实就是多个客户端竞争lockName这个 key,谁竞争到了这个 key 则将该 key 绑定到它的租约上,也就实现了由该线程获得了锁。
  具体竞争机制参见基于 Etcd 的分布式锁实现原理及方案,基本流程就是所有要竞争锁的客户端,都生成一个 /lockName/revisionId的 key-value,然后把所有 /locakName前缀的键值对都取回来,然后看自己拿到的这个revisionId是不是里边最小的,最小的说明就是最早获得了这个 key prefix 的线程。而 前面提到,ETCD 服务端会自动解除租约并删除上面的 key-value,所以当持有锁的客户端宕机被解除租约后,它的/lockName/revisionId就会从键值对列表中自动删除,从而使得新的最小的revisionId的那个线程获得锁。

*补充上述代码的内部类:LockResult *


    /**
     * 该class用于描述加锁的结果,同时携带解锁操作所需参数
     */
    public static class LockResult 
        private boolean isLockSuccess;
        private long leaseId;

        LockResult() 
            super();
        

        public void setIsLockSuccess(boolean isLockSuccess) 
            this.isLockSuccess = isLockSuccess;
        

        public void setLeaseId(long leaseId) 
            this.leaseId = leaseId;
        

        public boolean getIsLockSuccess() 
            return this.isLockSuccess;
        

        public long getLeaseId() 
            return this.leaseId;
        

        @Override
        public String toString() 
            return new StringBuilder("leaseId=").append(leaseId)
                    .append(",isLockSuccess=").append(isLockSuccess).toString();
        
    

  分布式锁具体应用示例,在我的场景中,有一个 Spring ScheduleTask,但这个 Task 在多台应用都会启用,所以有一个问题,就是多台应用同时会跑 Task,而我们希望只有一台应用在执行这批 Task,所以用到分布式锁来控制只有单台应用在执行。具体实现很简单,基于上面的分布式锁,那我们可以让这些应用各自竞争这把锁,然后竞争到锁的应用获得 Task 的执行权。具体实现如下:

MyTask.java

public class MyTask 

    private static final Executor singleThreadExecutor = Executors.newSingleThreadExecutor();

    /**
     * 是否有线程已经在尝试拿锁
     */
    private static Boolean lockTrying = false;

    private static final Object MUTEX = new Object();

    /**
     * 通过共享变量,JVM各线程复用同一把锁
     */
    private static DistributedLock.LockResult lockResultCache = null;

	private static final String LOCK_PREFIX = "/lock/schedule";
	
	private static final long LOCK_TTL_OF_SECONDS = 10L;

    public Result runTask() 
        // JVM已经拿到锁
        if (lockResultCache != null) 
	        // 执行具体业务
            return doSomething();
        
        synchronized (MUTEX) 
            // 只需一个线程进去拿锁,检查是否有线程正在tryLock或者成功拿到锁
            if (lockTrying || lockResultCache != null) 
                return null;
            
            lockTrying = true;
        
        // 等待 MUTEX 的线程得到执行后,可能 JVM 已经拿到锁,这里做下二次判断
        if (lockResultCache != null) 
            return pjp.proceed();
        
        // 开一个新线程专门负责阻塞拿锁,不阻塞业务线程
        singleThreadExecutor.execute(() -> 
            // 阻塞获取锁
            DistributedLock.LockResult lockResult = DistributedLock.getInstance().getLock(LOCK_PREFIX, LOCK_TTL_OF_SECONDS);
            if (lockResult.getIsLockSuccess()) 
                // 拿到锁之后存于JVM,让其他线程复用锁
                lockResultCache = lockResult;
            
            // 下次可以继续拿锁
            lockTrying = false;
        );
        return null;
    

  在我的场景中,还带来另一个问题,就是不仅多台应用能执行Task,并且每台应用都会有多个线程去执行 Task,而 ETCD API 这个 Lock,它是阻塞式获取锁的,会把线程夯住,直至拿到锁,或者出现异常中断。而实际上,对于一台应用来说,只需要一个线程去创建租约并阻塞拿锁就够了,只要这个线程拿到了锁,那就可以通知这台应用(JVM)上的所有线程,可以执行 Task 了,这就涉及到线程通信的问题,而线程通信,最简单的办法之一,就是共享内存。而 JVM 里边,静态变量是线程共享的,所以本例,使用了静态变量来存储获得的这把锁,只要拿锁的那个线程获得了锁,就给这个静态变量lockResult赋值,那么 JVM 里边的其他线程也就知道这台应用获得了锁,就都可以执行 Task 了。同时,针对只要一个线程去阻塞拿锁的问题,加一个 MUTEX 同步锁来简单控制。
  另外,阻塞拿锁这事相对独立,而执行 Task 的线程可能有其他用途,我们不希望拿它来阻塞拿锁,所以上述例子开了个新线程专门负责阻塞拿锁。

以上是关于Go语言基于Etcd实现的定时任务的主要内容,如果未能解决你的问题,请参考以下文章

Go cron定时任务的用法

go语言中的timer 和ticker定时任务

Golang 定时任务管理

webcron

Go语言中定时任务库Cron使用详解

Go语言中定时任务库Cron使用详解