分布式锁Redis/ZK/etcd
分布式锁Redis/ZK/etcd
什么是分布式锁
分布式锁是在分布式系统中用于控制多个进程或服务访问共享资源的机制。它确保在同一时间只有一个进程能够访问特定的资源,防止并发冲突和数据不一致。
基于Redis的分布式锁
基本实现
// Redis分布式锁实现
public class RedisDistributedLock {
private final RedisTemplate<String, String> redisTemplate;
private final String lockKey;
private final String lockValue;
private final long expireTime;
public RedisDistributedLock(RedisTemplate<String, String> redisTemplate,
String lockKey,
long expireTime) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
this.expireTime = expireTime;
}
// 获取锁
public boolean tryLock() {
String result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
return "OK".equals(result);
}
// 释放锁
public boolean unlock() {
// 使用Lua脚本确保原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
);
return result != null && result == 1L;
}
}
// 使用示例
@Component
public class OrderService {
private final RedisTemplate<String, String> redisTemplate;
public void processOrder(Long orderId) {
String lockKey = "order:" + orderId;
RedisDistributedLock lock = new RedisDistributedLock(redisTemplate, lockKey, 30000);
try {
if (lock.tryLock()) {
// 处理订单
doProcessOrder(orderId);
} else {
throw new LockAcquisitionException("Failed to acquire lock for order: " + orderId);
}
} finally {
lock.unlock();
}
}
}
Redisson实现
// 使用Redisson实现分布式锁
@Component
public class RedissonLockService {
private final RedissonClient redissonClient;
public void processOrder(Long orderId) {
RLock lock = redissonClient.getLock("order:" + orderId);
try {
// 尝试获取锁,最多等待100秒,锁过期时间30秒
if (lock.tryLock(100, 30, TimeUnit.SECONDS)) {
try {
// 处理订单
doProcessOrder(orderId);
} finally {
lock.unlock();
}
} else {
throw new LockAcquisitionException("Failed to acquire lock");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockAcquisitionException("Lock acquisition interrupted", e);
}
}
// 分布式锁与业务逻辑结合
public void processWithRetry(Long orderId, int maxRetries) {
RLock lock = redissonClient.getLock("order:" + orderId);
for (int i = 0; i < maxRetries; i++) {
try {
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
try {
doProcessOrder(orderId);
return;
} finally {
lock.unlock();
}
}
// 等待一段时间后重试
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
throw new LockAcquisitionException("Failed to acquire lock after " + maxRetries + " retries");
}
}
基于ZooKeeper的分布式锁
临时顺序节点实现
// ZooKeeper分布式锁实现
public class ZookeeperDistributedLock {
private final ZooKeeper zooKeeper;
private final String lockPath;
private String currentNode;
public ZookeeperDistributedLock(ZooKeeper zooKeeper, String lockPath) {
this.zooKeeper = zooKeeper;
this.lockPath = lockPath;
}
// 获取锁
public void lock() throws Exception {
// 创建临时顺序节点
currentNode = zooKeeper.create(
lockPath + "/lock-",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
while (true) {
// 获取所有子节点
List<String> children = zooKeeper.getChildren(lockPath, false);
Collections.sort(children);
// 检查当前节点是否是最小的节点
String smallestNode = lockPath + "/" + children.get(0);
if (currentNode.equals(smallestNode)) {
// 获取锁成功
return;
}
// 找到当前节点的前一个节点
String previousNode = null;
for (String child : children) {
String childPath = lockPath + "/" + child;
if (childPath.compareTo(currentNode) < 0) {
previousNode = childPath;
} else {
break;
}
}
if (previousNode != null) {
// 等待前一个节点被删除
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zooKeeper.exists(previousNode, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat == null) {
// 节点已经被删除,重新检查
continue;
}
latch.await();
}
}
}
// 释放锁
public void unlock() throws Exception {
zooKeeper.delete(currentNode, -1);
currentNode = null;
}
}
// 使用示例
@Component
public class ZookeeperLockService {
private final ZooKeeper zooKeeper;
public void processOrder(Long orderId) throws Exception {
ZookeeperDistributedLock lock = new ZookeeperDistributedLock(zooKeeper, "/locks/orders");
try {
lock.lock();
// 处理订单
doProcessOrder(orderId);
} finally {
lock.unlock();
}
}
}
Curator框架实现
// 使用Curator实现分布式锁
@Component
public class CuratorLockService {
private final CuratorFramework curatorClient;
public void processOrder(Long orderId) {
InterProcessMutex lock = new InterProcessMutex(curatorClient, "/locks/orders/" + orderId);
try {
// 尝试获取锁,最多等待10秒
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
// 处理订单
doProcessOrder(orderId);
} finally {
lock.release();
}
} else {
throw new LockAcquisitionException("Failed to acquire lock");
}
} catch (Exception e) {
throw new LockAcquisitionException("Lock acquisition failed", e);
}
}
// 可重入锁
public void processOrderReentrant(Long orderId) {
InterProcessMutex lock = new InterProcessMutex(curatorClient, "/locks/orders/" + orderId);
try {
lock.acquire();
// 可以多次获取同一把锁
lock.acquire();
try {
doProcessOrder(orderId);
} finally {
lock.release(); // 释放一次
lock.release(); // 再释放一次
}
} catch (Exception e) {
throw new LockAcquisitionException("Lock acquisition failed", e);
}
}
}
基于etcd的分布式锁
// etcd分布式锁实现
public class EtcdDistributedLock {
private final EtcdClient etcdClient;
private final String lockPath;
private String leaseId;
private String currentKey;
public EtcdDistributedLock(EtcdClient etcdClient, String lockPath) {
this.etcdClient = etcdClient;
this.lockPath = lockPath;
}
// 获取锁
public void lock() throws Exception {
// 创建租约
leaseId = etcdClient.getLeaseClient().grant(30).get().getID();
// 创建前缀KV
currentKey = lockPath + "/" + UUID.randomUUID().toString();
// 使用事务确保原子性
Transaction txn = etcdClient.getLockClient().transaction();
// 检查是否有更小的key
List<KeyValue> existingKeys = etcdClient.getKVClient().get(
ByteString.copyFromUtf8(lockPath),
GetOption.builder().isPrefix(true).build()
).get().getKvsList();
// 找到最小的key
String minKey = null;
for (KeyValue kv : existingKeys) {
String key = kv.getKey().toStringUtf8();
if (minKey == null || key.compareTo(minKey) < 0) {
minKey = key;
}
}
// 如果当前key是最小的,获取锁成功
if (minKey == null || currentKey.compareTo(minKey) < 0) {
// 创建key并关联租约
etcdClient.getKVClient().put(
ByteString.copyFromUtf8(currentKey),
ByteString.copyFromUtf8("locked"),
PutOption.builder().withLeaseId(leaseId).build()
).get();
// 保持租约活跃
keepAlive();
return;
}
// 等待前一个key被删除
watchKey(minKey);
}
// 释放锁
public void unlock() throws Exception {
// 删除key
etcdClient.getKVClient().delete(
ByteString.copyFromUtf8(currentKey)
).get();
// 撤销租约
etcdClient.getLeaseClient().revoke(leaseId).get();
currentKey = null;
leaseId = null;
}
// 保持租约活跃
private void keepAlive() {
etcdClient.getLeaseClient().keepAlive(leaseId, new StreamObserver<>() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
// 租约续期成功
}
@Override
public void onError(Throwable t) {
// 租约续期失败,可能需要重新获取锁
}
@Override
public void onCompleted() {
// 流结束
}
});
}
// 监听key删除事件
private void watchKey(String key) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
etcdClient.getWatchClient().watch(
ByteString.copyFromUtf8(key),
WatchOption.builder().build(),
new StreamObserver<>() {
@Override
public void onNext(WatchResponse value) {
if (value.getEventsList().stream()
.anyMatch(event -> event.getEventType() == Event.EventType.DELETE)) {
latch.countDown();
}
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
// 流结束
}
}
);
latch.await();
}
}
方案对比
| 特性 | Redis | ZooKeeper | etcd |
|---|---|---|---|
| 性能 | 高 | 中等 | 中等 |
| 可靠性 | 中等 | 高 | 高 |
| 实现复杂度 | 低 | 中等 | 中等 |
| 锁超时 | 支持 | 支持 | 支持 |
| 可重入 | 需要额外实现 | 原生支持 | 需要额外实现 |
| 公平性 | 不公平 | 公平 | 公平 |
| 适用场景 | 高性能场景 | 强一致性场景 | 高可用场景 |
最佳实践
- 锁超时:设置合理的锁超时时间,避免死锁
- 可重入性:根据需要实现可重入锁
- 锁续期:对于长时间运行的任务,实现锁续期机制
- 异常处理:确保在异常情况下能够释放锁
- 性能优化:根据场景选择合适的锁实现方案
- 监控告警:监控锁的获取和释放情况,及时发现异常