← 返回首页
🌐

分布式锁:可重入、自动续期与红锁

📂 architecture ⏱ 2 min 379 words

分布式锁:可重入、自动续期与红锁

分布式锁核心需求

分布式锁需要满足互斥性、无死锁、容错性和可重入性。锁的实现方式取决于可用的协调服务(Redis、ZooKeeper、etcd等)。

// 分布式锁接口
public interface DistributedLock {
    boolean tryLock(String key, long timeoutMs);
    boolean tryLock(String key, long waitMs, long leaseMs);
    void unlock(String key);
    boolean isLocked(String key);
    
    // 可重入锁
    boolean tryReentrantLock(String key, long leaseMs);
    void unlockReentrant(String key);
}

Redis分布式锁

Redis使用SET命令实现分布式锁,通过NX和PX参数保证原子性。

// Redis分布式锁实现
public class RedisDistributedLock implements DistributedLock {
    private final RedisTemplate<String, String> redisTemplate;
    
    @Override
    public boolean tryLock(String key, long leaseMs) {
        String lockValue = UUID.randomUUID().toString();
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, lockValue, leaseMs, TimeUnit.MILLISECONDS);
        return Boolean.TRUE.equals(result);
    }
    
    @Override
    public void unlock(String key) {
        // 使用Lua脚本保证原子性
        String script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
            """;
        
        redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            getLockValue(key)
        );
    }
}

// Redisson分布式锁(生产推荐)
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("order:lock:" + orderId);

try {
    // 尝试获取锁,等待10秒,锁自动释放时间30秒
    if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
        // 执行业务逻辑
        processOrder(orderId);
    }
} finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock();
    }
}

可重入锁实现

可重入锁允许同一个线程多次获取同一把锁,使用计数器跟踪加锁次数。

// Redis可重入锁
type RedisReentrantLock struct {
    client *redis.Client
    key    string
    value  string  // UUID + 随机数
    count  int32   // 加锁次数
}

func (l *RedisReentrantLock) Lock(ctx context.Context, ttl time.Duration) error {
    // 检查是否已持有锁
    if l.isHeldByCurrentThread(ctx) {
        l.count++
        return l.refreshTTL(ctx, ttl)
    }
    
    // 尝试获取锁
    for {
        ok, err := l.client.SetNX(ctx, l.key, l.value, ttl).Result()
        if err != nil {
            return err
        }
        if ok {
            l.count = 1
            return nil
        }
        time.Sleep(50 * time.Millisecond)
    }
}

func (l *RedisReentrantLock) Unlock(ctx context.Context) error {
    if l.count > 1 {
        l.count--
        return nil
    }
    
    // 使用Lua脚本原子删除
    script := `
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
    `
    _, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    l.count = 0
    return err
}

func (l *RedisReentrantLock) isHeldByCurrentThread(ctx context.Context) bool {
    val, err := l.client.Get(ctx, l.key).Result()
    return err == nil && val == l.value
}

自动续期(Watchdog)

Redisson的Watchdog机制自动延长锁的持有时间,防止业务未完成锁就过期。

// Redisson Watchdog原理
// 默认锁持有时间30秒,每10秒续期一次
RLock lock = redisson.getLock("resource");
lock.lock(); // 不指定leaseTime,启用Watchdog

// 等效于
lock.lock(30, TimeUnit.SECONDS); // 手动指定
// Redisson会在锁持有10秒后自动续期到30秒

Redlock算法

Redlock是Redis官方推荐的分布式锁算法,通过多个独立Redis实例保证锁的可靠性。

// Redlock实现
List<RedissonClient> redissonClients = Arrays.asList(
    redisson1, redisson2, redisson3, redisson4, redisson5
);

RedissonRedLock redLock = new RedissonRedLock(
    redissonClients.stream()
        .map(client -> client.getLock("resource"))
        .toArray(RLock[]::new)
);

try {
    // 尝试获取RedLock,5个实例中至少3个成功
    if (redLock.tryLock(5, 30, TimeUnit.SECONDS)) {
        // 执行业务逻辑
        processResource();
    }
} finally {
    redLock.unlock();
}

分布式锁选型

特性 Redis ZooKeeper etcd
性能 最高 中等 中等
一致性 AP CP CP
可重入 需额外实现 原生支持 需额外实现
自动续期 Redisson支持 不支持 不支持
适用场景 高并发 强一致 强一致