分布式锁:可重入、自动续期与红锁
分布式锁:可重入、自动续期与红锁
分布式锁核心需求
分布式锁需要满足互斥性、无死锁、容错性和可重入性。锁的实现方式取决于可用的协调服务(Redis、ZooKeeper、etcd等)。
// 分布式锁接口
public interface DistributedLock {
boolean tryLock(String key, long timeoutMs);
boolean tryLock(String key, long waitMs, long leaseMs);
void unlock(String key);
boolean isLocked(String key);
// 可重入锁
boolean tryReentrantLock(String key, long leaseMs);
void unlockReentrant(String key);
}
Redis分布式锁
Redis使用SET命令实现分布式锁,通过NX和PX参数保证原子性。
// Redis分布式锁实现
public class RedisDistributedLock implements DistributedLock {
private final RedisTemplate<String, String> redisTemplate;
@Override
public boolean tryLock(String key, long leaseMs) {
String lockValue = UUID.randomUUID().toString();
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, lockValue, leaseMs, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(result);
}
@Override
public void unlock(String key) {
// 使用Lua脚本保证原子性
String script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
""";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
getLockValue(key)
);
}
}
// Redisson分布式锁(生产推荐)
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("order:lock:" + orderId);
try {
// 尝试获取锁,等待10秒,锁自动释放时间30秒
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
// 执行业务逻辑
processOrder(orderId);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
可重入锁实现
可重入锁允许同一个线程多次获取同一把锁,使用计数器跟踪加锁次数。
// Redis可重入锁
type RedisReentrantLock struct {
client *redis.Client
key string
value string // UUID + 随机数
count int32 // 加锁次数
}
func (l *RedisReentrantLock) Lock(ctx context.Context, ttl time.Duration) error {
// 检查是否已持有锁
if l.isHeldByCurrentThread(ctx) {
l.count++
return l.refreshTTL(ctx, ttl)
}
// 尝试获取锁
for {
ok, err := l.client.SetNX(ctx, l.key, l.value, ttl).Result()
if err != nil {
return err
}
if ok {
l.count = 1
return nil
}
time.Sleep(50 * time.Millisecond)
}
}
func (l *RedisReentrantLock) Unlock(ctx context.Context) error {
if l.count > 1 {
l.count--
return nil
}
// 使用Lua脚本原子删除
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`
_, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
l.count = 0
return err
}
func (l *RedisReentrantLock) isHeldByCurrentThread(ctx context.Context) bool {
val, err := l.client.Get(ctx, l.key).Result()
return err == nil && val == l.value
}
自动续期(Watchdog)
Redisson的Watchdog机制自动延长锁的持有时间,防止业务未完成锁就过期。
// Redisson Watchdog原理
// 默认锁持有时间30秒,每10秒续期一次
RLock lock = redisson.getLock("resource");
lock.lock(); // 不指定leaseTime,启用Watchdog
// 等效于
lock.lock(30, TimeUnit.SECONDS); // 手动指定
// Redisson会在锁持有10秒后自动续期到30秒
Redlock算法
Redlock是Redis官方推荐的分布式锁算法,通过多个独立Redis实例保证锁的可靠性。
// Redlock实现
List<RedissonClient> redissonClients = Arrays.asList(
redisson1, redisson2, redisson3, redisson4, redisson5
);
RedissonRedLock redLock = new RedissonRedLock(
redissonClients.stream()
.map(client -> client.getLock("resource"))
.toArray(RLock[]::new)
);
try {
// 尝试获取RedLock,5个实例中至少3个成功
if (redLock.tryLock(5, 30, TimeUnit.SECONDS)) {
// 执行业务逻辑
processResource();
}
} finally {
redLock.unlock();
}
分布式锁选型
| 特性 | Redis | ZooKeeper | etcd |
|---|---|---|---|
| 性能 | 最高 | 中等 | 中等 |
| 一致性 | AP | CP | CP |
| 可重入 | 需额外实现 | 原生支持 | 需额外实现 |
| 自动续期 | Redisson支持 | 不支持 | 不支持 |
| 适用场景 | 高并发 | 强一致 | 强一致 |