分布式锁

分布式锁

一. 幂等

1.1 什么是幂等?

幂等操作是任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。

在分布式应用中,幂等是非常重要的,也就是相同条件下对一个业务的操作,不管操作多少次,结果都是一样。

1.2 分布式系统的幂等场景

为什么要有幂等这种场景?分布式部署如:订单业务 和库存业务有可能都是独立部署的,都是单独的服务。用户下订单,会调用到订单服务和库存服务。

海量订单产生的业务高峰期,如何避免消息的重复消费?

因为分布式部署,很有可能在调用库存服务时,因为网络等原因,订单服务调用失败,但其实库存服务已经处理完成,只是返回给订单服务处理结果时出现了异常。这个时候一般系统会作补偿方案,也就是订单服务再此放起库存服务的调用,库存减1。

1
update t_goods set count = count -1 where good_id=2

这样就出现了问题,其实上一次调用已经减了1,只是订单服务没有收到处理结果。现在又调用一次,又要减1,这样就不符合业务了,多扣了。

幂等这个概念就是,不管库存服务在相同条件下调用几次,处理结果都一样。这样才能保证补偿方案的可行性。

1.3 常见的幂等操作

  • 查询:数据不变时查询都是幂等操作。

  • 删除:虽然返回结果不一样,但操作效果一样。

  • 唯一数据:数据库增加唯一约束保证数据唯一性,无论多少次新增都只会回库成功一条。

  • 页面防止重复提交:重复点击或Nginx重发等导致数据被重复提交。

    • 集群环境下使用token+redis
    • 单机下采用token+redis或token+jvm
    • 提交数据前,先向后台申请token,放置于redis或jvm内存,并设置有效时间。提交后校验token并删除,生成新的token返回表示token校验通过。
  • 悲观锁:总是假设最坏的情况。认为每次取数据的时候都会被修改,所以每次在拿数据的时候都会上锁。

    注意id要为主键或唯一索引不然会导致锁表

    1
    select * from tableA where id=’xxx’ for update;
  • 乐观锁:总是假设最好的情况。认为每次取数据的时候都不会被修改,所以每次都不会上锁。通过版本号机制和CAS算法实现,乐观锁适用于多读的应用类型,这样可以提高吞吐量。

  • 分布式锁:分布式系统构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,其实就是为了控制多线程并发的操作,也是分布式系统中经常用到的解决思路。

  • select + insert

    并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,在进行业务处理,就可以了。

    注意:核心高并发流程不要用这种方法。

  • 状态机幂等

    在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),就是业务单据上面有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,这时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的变更,理论上是不能够变更的,这样的话,保证了有限状态机的幂等。

    注意:订单等单据类业务,存在很长的状态流转,一定要深刻理解状态机,对业务系统设计能力提高有很大帮助。

  • 对外提供接口的api如何保证幂等

    如银联提供的付款接口:需要接入商户提交付款请求时附带:source来源,seq序列号

    source+seq在数据库里面做唯一索引,防止多次付款,(并发时,只能处理一个请求)。

    对外提供接口为了支持幂等调用,接口有两个字段必须传,一个是来源source,一个是来源方序列号seq,这个两个字段在提供方系统里面做联合唯一索引,这样当第三方调用时,先在本方系统里面查询一下,是否已经处理过,返回相应处理结果;没有处理过,进行相应处理,返回结果。

二. 案例

  • 问题表象:某系统用户个人证书要保证唯一,但确产生了多条证书数据。
  • 问题原因:并发导致。
  • 业务流程
    1. 用户选择上传证书,初始化一条数据。
    2. 上传证书。
    3. 校验通过后修改数据的状态字段。

修改数据库状态前,会先检查是否已上传成功、是否已校验通过以及当前状态是否需要修改等来保证幂等,只有首次请求修改数据状态才能修改成功。后续请求则直接返回。但实际运行中多次请求可能会并发执行,同时通过了上述的幂等判断,导致对数据进行了多次修改。

一次请求可以只是新增一条数据,那么通过数据库创建唯一索引就可以解决幂等问题,但当请求不仅仅只做一件事情时,这种复杂的场景下如果组件无法支持幂等,就需要额外的方案,如分布式锁。

三. 常见幂等方案实现

3.1 乐观锁

借鉴数据库的乐观锁机制,如:

1
update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1

根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

通过版本号实现:

1
update table_xxx set name = {name}, version = version + 1 where version = {version}

通过条件限制:

1
update table_xxx set avai_amount = avai_amount - {subAmount} where avai_amount - {subAmount} >= 0

要求:avai_amount-subAmount >= 0

这个情景适合不用版本号,只更新是做数据安全校验,适合库存模型,扣份额和回滚份额,性能更高。

注意:乐观锁的更新操作,最好用主键或者唯一索引来更新,这样是行锁,否则更新时会锁表,上面两个sql改成下面的两个更好。

1
2
3
4
update table_xxx set name = {name}, version = version + 1 where id = {id} and version = {version}

update table_xxx set avai_amount = avai_amount - {subAmount} where id = {id} and
avai_amount - {subAmount} >= 0

3.2 唯一ID + 指纹码

原理就是利用数据库主键去重,业务完成后插入主键标识:

1
select count(1) from t_check where ID=唯一ID + 指纹码
  • 唯一ID就是业务表的唯一的主键,如商品ID
  • 指纹码就是为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号的方式。

上面的sql语句:

  • 返回如果为0 表示没有操作过,那业务操作后就可以insert into t_check(唯一ID+指纹码)
  • 返回如果大于0 表示操作过,就直接返回
  • 优点:实现简单
  • 缺点:高并发下数据库瓶颈

解决方案:根据ID进行分库分表进行算法路由。

3.3 Redis原子操作

利用Redis的原子操作,做个操作完成的标记。性能较好,但会遇到一些问题:

  1. 我们是否需要把业务结果进行数据落库,如果落库,关键解决的问题时数据库和redis操作如何做到原子性?

    这个意思就是库存减1了,但redis进行操作完成标记时,失败了怎么办?也就是一定要保证落库和redis 要么一起成功,要么一起失败。

  2. 如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?

    这个意思就是库存减1,不落库,直接先操作redis操作完成标记,然后由另外的同步服务进行库存落库,这个就是增加了系统复杂性,而且同步策略如何设置。

四. 分布式锁

分布式锁要考虑到两个分布式场景的问题:

  • 单点故障:当持有锁的应用发生单点故障时,锁会被长期持有无法释放。
  • 网络超时:当客户端超时但实际上获取锁时,无法再次成功获取锁。

单点故障的解决方案是加入过期时间。网络超时问题则使用可重入锁,记录占有锁的用户ID,当超时重试时使占有锁的用户能再次请求成功。

4.1 MVCC

综上所述可以给出一个实现方案:使用缓存作为锁的存储介质,利用MVCC(Multiversion concurrency control)机制解决共享资源互斥访问问题。

分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class MdbDistributeLock implements DistributeLock {

/**
* 锁的命名空间
*/
private final int namespace;

/**
* 锁对应的缓存key
*/
private final String lockName;

/**
* 锁的唯一标识,保证可重入,以应对put成功,但是返回超时的情况
*/
private final String lockId;

/**
* 是否持有锁。true:是
*/
private boolean locked;

/**
* 缓存实例
*/
private final TairManager tairManager;

public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {

this.tairManager = tairManager;
this.namespace = namespace;
this.lockName = lockCacheKey;
this.lockId = UUID.randomUUID().toString();
}

@Override
public boolean tryLock() {

try {
//获取锁状态
Result<DataEntry> getResult = null;
ResultCode getResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
getResult = tairManager.get(namespace, lockName);
getResultCode = getResult == null ? null : getResult.getRc();
if (noNeedRetry(getResultCode)) {
break;
}
}

//重入,已持有锁,返回成功
if (ResultCode.SUCCESS.equals(getResultCode)
&& getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) {
locked = true;
return true;
}

//不可获取锁,返回失败
if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}

//尝试获取锁
ResultCode putResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION,
DEFAULT_EXPIRE_TIME_SEC);
if (noNeedRetry(putResultCode)) {
break;
}
}
if (!ResultCode.SUCCESS.equals(putResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}
locked = true;
return true;

} catch (Exception e) {
log.error("DistributedLock.tryLock fail lock={}", this, e);
}
return false;
}

@Override
public void unlock() {

if (!locked) {
return;
}
ResultCode resultCode = tairManager.invalid(namespace, lockName);
if (!resultCode.isSuccess()) {
log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode,
EagleEye.getTraceId());
}
locked = false;
}

/**
* 判断是否需要重试
*
* @param resultCode 缓存的返回码
* @return true:不用重试
*/
private boolean noNeedRetry(ResultCode resultCode) {
return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals(
resultCode) && !ResultCode.UNKNOW.equals(resultCode);
}

}

分布式锁工厂类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MdbDistributeLockFactory implements DistributeLockFactory {

/**
* 缓存的命名空间
*/
@Setter
private int namespace;

@Setter
private MultiClusterTairManager mtairManager;

@Override
public DistributeLock getLock(String lockName) {
return new MdbDistributeLock(mtairManager, namespace, lockName);
}
}

使用分布式锁:

● 初始化分布式锁的工厂
● 利用工厂生成一个分布式锁实例
● 使用该分布式实例上锁和解锁操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@Test
public void testTryLock() {

//初始化工厂
MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory();
mdbDistributeLockFactory.setNamespace(603);
mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());

//获得锁
DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");

//上锁解锁操作
boolean locked = lock.tryLock();
if (!locked) {
return;
}
try {
//do something
} finally {
lock.unlock();
}
}

问题:释放锁的时候只是简单的将缓存中的key失效,所以存在错误释放他人已持有锁问题。所幸只要锁的租期设置的足够长,该问题出现几率就足够小。

当占有锁的Client 1在释放锁之前,锁就已经到期了,Client 2将获取锁,此时锁被Client 2持有,但是Client 1可能会错误的将其释放。一个更优秀的方案,我们给每个锁都设置一个身份标识,在释放锁的时候,1)首先查询锁是否是自己的,2)如果是自己的则释放锁。受限于实现方式,步骤1和步骤2不是原子操作,在步骤1和步骤2之间,如果锁到期被其他客户端获取,此时也会错误的释放他人的锁。

4.2 Redis

借助Redis的Lua脚本,可以完美的解决存在错误释放他人已持有锁问题的。

当我们想要获取锁时,我们可以执行如下方法:

1
SET resource_name my_random_value NX PX 30000

当我们想要释放锁时,我们可以执行如下的Lua脚本:

1
if redis.call("get",KEYS[1]) == ARGV[1] then    return redis.call("del",KEYS[1])else    return 0end

4.3 Redisson

在方案一和方案二的讨论过程中,有一个问题被我们反复提及:锁的自动释放。

这是一把双刃剑:

  1. 一方面它很好的解决了持有锁的客户端单点故障的问题
  2. 另一方面,如果锁提前释放,就会出现锁的错误持有状态

这个时候,我们可以引入Watch Dog自动续租机制,我们可以参考以下Redisson是如何实现的。

在上锁成功后,Redisson会调用 renewExpiration() 方法开启一个Watch Dog线程,为锁自动续期。每过1/3时间续一次,成功则继续下一次续期,失败取消续期操作。

我们可以再看看Redisson是如何续期的。renewExpiration() 方法调用的 renewExpirationAsync() 方法是执行锁续期的关键操作,我们进入到方法内部,可以看到Redisson也是使用Lua脚本进行锁续租的:

  1. 判断锁是否存在;
  2. 如果存在则重置过期时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void renewExpiration() {    
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

使用Lua脚本进行锁续租:

1
2
3
4
5
6
7
8
9
10
protected RFuture<Boolean> renewExpirationAsync(long threadId) {    
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}

4.4 RedLock

借助Redisson的自动续期机制,我们无需再担心锁的自动释放。但是讨论到这里,我还是不得不面对一个问题:分布式锁本身不是一个分布式应用。当Redis服务器故障无法正常工作时,整个分布式锁也就无法提供服务。

更进一步,我们可以看看Distributed locks with Redis这篇文章中提到的Redlock算法及其实现。

Redlock算法不是银弹,关于它的好与坏,也有很多争论:


参考:

🔗 《并发场景下的幂等问题——分布式锁详解