Go语言基于Etcd实现的定时任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言基于Etcd实现的定时任务相关的知识,希望对你有一定的参考价值。
参考技术A 利用 Etcd 的Lease租约特性来实现定时功能,同时通过Watch机制来实现多节点情况下只有一个节点执行该任务。通过定时任务库 Cron 的时间字符串解析器Parser来解析任务执行时间。Etcd
Cron
源码链接
基于ETCD实现分布式锁&实战:控制多个应用仅一台执行任务
我们知道,分布式锁有好几种方案:基于Redis、基于数据库如MySQL、基于注册中心如Zookeeper等;而 K8S 体系中基于 Go 语言编写的的 ETCD 则对于分布式锁有着更强大的支持。
ETCD 有一个租约机制,客户端跟 ETCD 服务端订立一个“租约”后,需要在租约到期之前进行续约,否则会在到期后被自动解除租约,而租约可以挂载多个 key-value,当租约过期时,挂载在上面的 key-value 也会跟着被删除。既有类似 Redis / Zookeeper 的 key-value 机制能够实现分布式锁,同时租约机制,又能实现某个客户端宕机后,服务端自动检测锁超时并自动释放锁。
储备阅读:
基于 Etcd 的分布式锁实现原理及方案
这里,使用 ETCD 封装好的 lockClient 来实现分布式锁,例子如下:
maven依赖:
io.etcd:jetcd-core:jar:0.5.3:compile
io.grpc:grpc-core:jar:1.31.1:compile
# 有用到dubbo的话直接引入一个依赖即可:
org.apache.dubbo:dubbo-remoting-etcd3:jar:2.7.11:compile
DistributedLock.java
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
/**
* 基于 ETCD 的分布式锁
*
* @author lvlang
* @date 2022/2/24
*/
public class DistributedLock
private static DistributedLock lockProvider = null;
private static final Object MUTEX = new Object();
private Client client;
private Lock lockClient;
private Lease leaseClient;
private DistributedLock()
super();
// 创建Etcd客户端,本例中Etcd集群只有一个节点
this.client = Client.builder().endpoints("http://localhost:2379").build();
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
public static DistributedLock getInstance()
synchronized (MUTEX)
if (null == lockProvider)
lockProvider = new DistributedLock();
return lockProvider;
/**
* 加锁操作
*
* @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
* @param ttl: Time To Live(单位秒),租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
* @return LockResult
*/
public LockResult getLock(String lockName, long ttl)
LockResult lockResult = new LockResult();
/*1.准备阶段*/
// 初始化返回值lockResult
lockResult.setIsLockSuccess(false);
// 记录租约ID,初始值设为 0L
long leaseId = 0;
/*2.创建租约并设置自动续约*/
// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定。
try
leaseId = leaseClient.grant(ttl).get().getID();
lockResult.setLeaseId(leaseId);
catch (InterruptedException | ExecutionException e)
return lockResult;
StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>()
@Override
public void onNext(LeaseKeepAliveResponse arg0)
// 自动续约心跳
@Override
public void onError(Throwable arg0)
@Override
public void onCompleted()
;
// 设置自动续约
leaseClient.keepAlive(leaseId, observer);
/*3.加锁操作*/
// 执行加锁操作,并为锁对应的Key绑定租约
try
// 阻塞式获取锁
lockClient.lock(ByteSequence.from(lockName, StandardCharsets.UTF_8), leaseId).get();
catch (InterruptedException | ExecutionException e1)
// 解除租约
leaseClient.revoke(leaseId);
return lockResult;
lockResult.setIsLockSuccess(true);
return lockResult;
public static class LockResult
// 在下面补充
租约这块,可以参考基于 Etcd 的分布式锁实现原理及方案 的路线一那样,自定义一个ScheduledExecutorService
定时任务服务,定期对租约进行续期。而 ETCD 的 API 封装也支持通过观察器实现自动续约:Lease.keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer);
创建好租约并设置了自动续约后,那后面就是把 key-value 绑定到租约上,上面例子中lockClient.lock()
做的事情,其实就是多个客户端竞争lockName
这个 key,谁竞争到了这个 key 则将该 key 绑定到它的租约上,也就实现了由该线程获得了锁。
具体竞争机制参见基于 Etcd 的分布式锁实现原理及方案,基本流程就是所有要竞争锁的客户端,都生成一个 /lockName/revisionId
的 key-value,然后把所有 /locakName
前缀的键值对都取回来,然后看自己拿到的这个revisionId
是不是里边最小的,最小的说明就是最早获得了这个 key prefix 的线程。而 前面提到,ETCD 服务端会自动解除租约并删除上面的 key-value,所以当持有锁的客户端宕机被解除租约后,它的/lockName/revisionId
就会从键值对列表中自动删除,从而使得新的最小的revisionId
的那个线程获得锁。
*补充上述代码的内部类:LockResult *
/**
* 该class用于描述加锁的结果,同时携带解锁操作所需参数
*/
public static class LockResult
private boolean isLockSuccess;
private long leaseId;
LockResult()
super();
public void setIsLockSuccess(boolean isLockSuccess)
this.isLockSuccess = isLockSuccess;
public void setLeaseId(long leaseId)
this.leaseId = leaseId;
public boolean getIsLockSuccess()
return this.isLockSuccess;
public long getLeaseId()
return this.leaseId;
@Override
public String toString()
return new StringBuilder("leaseId=").append(leaseId)
.append(",isLockSuccess=").append(isLockSuccess).toString();
分布式锁具体应用示例,在我的场景中,有一个 Spring ScheduleTask,但这个 Task 在多台应用都会启用,所以有一个问题,就是多台应用同时会跑 Task,而我们希望只有一台应用在执行这批 Task,所以用到分布式锁来控制只有单台应用在执行。具体实现很简单,基于上面的分布式锁,那我们可以让这些应用各自竞争这把锁,然后竞争到锁的应用获得 Task 的执行权。具体实现如下:
MyTask.java
public class MyTask
private static final Executor singleThreadExecutor = Executors.newSingleThreadExecutor();
/**
* 是否有线程已经在尝试拿锁
*/
private static Boolean lockTrying = false;
private static final Object MUTEX = new Object();
/**
* 通过共享变量,JVM各线程复用同一把锁
*/
private static DistributedLock.LockResult lockResultCache = null;
private static final String LOCK_PREFIX = "/lock/schedule";
private static final long LOCK_TTL_OF_SECONDS = 10L;
public Result runTask()
// JVM已经拿到锁
if (lockResultCache != null)
// 执行具体业务
return doSomething();
synchronized (MUTEX)
// 只需一个线程进去拿锁,检查是否有线程正在tryLock或者成功拿到锁
if (lockTrying || lockResultCache != null)
return null;
lockTrying = true;
// 等待 MUTEX 的线程得到执行后,可能 JVM 已经拿到锁,这里做下二次判断
if (lockResultCache != null)
return pjp.proceed();
// 开一个新线程专门负责阻塞拿锁,不阻塞业务线程
singleThreadExecutor.execute(() ->
// 阻塞获取锁
DistributedLock.LockResult lockResult = DistributedLock.getInstance().getLock(LOCK_PREFIX, LOCK_TTL_OF_SECONDS);
if (lockResult.getIsLockSuccess())
// 拿到锁之后存于JVM,让其他线程复用锁
lockResultCache = lockResult;
// 下次可以继续拿锁
lockTrying = false;
);
return null;
在我的场景中,还带来另一个问题,就是不仅多台应用能执行Task,并且每台应用都会有多个线程去执行 Task,而 ETCD API 这个 Lock,它是阻塞式获取锁的,会把线程夯住,直至拿到锁,或者出现异常中断。而实际上,对于一台应用来说,只需要一个线程去创建租约并阻塞拿锁就够了,只要这个线程拿到了锁,那就可以通知这台应用(JVM)上的所有线程,可以执行 Task 了,这就涉及到线程通信的问题,而线程通信,最简单的办法之一,就是共享内存。而 JVM 里边,静态变量是线程共享的,所以本例,使用了静态变量来存储获得的这把锁,只要拿锁的那个线程获得了锁,就给这个静态变量lockResult
赋值,那么 JVM 里边的其他线程也就知道这台应用获得了锁,就都可以执行 Task 了。同时,针对只要一个线程去阻塞拿锁的问题,加一个 MUTEX 同步锁来简单控制。
另外,阻塞拿锁这事相对独立,而执行 Task 的线程可能有其他用途,我们不希望拿它来阻塞拿锁,所以上述例子开了个新线程专门负责阻塞拿锁。
以上是关于Go语言基于Etcd实现的定时任务的主要内容,如果未能解决你的问题,请参考以下文章