应用场景
当多个机器(多个进程)会对同一条数据进行修改时,并且要求这个修改是原子性的。这里有两个限定:(1)多个进程之间的竞争,意味着JDK自带的锁失效;(2)原子性修改,意味着数据是有状态的,修改前后有依赖。
实现方式
- 基于Redis实现,主要基于redis的setnx(set if not exist)命令;
- 基于Zookeeper实现;
- 基于version字段实现,乐观锁,两个线程可以同时读取到原有的version值,但是最终只有一个可以完成操作;
这三种方式中,我接触过第一和第三种。基于redis的分布式锁功能更加强大,可以实现阻塞和非阻塞锁。
基于Redis的实践
锁的实现
- 锁的key为目标数据的唯一键,value为锁的期望超时时间点;
- 首先进行一次setnx命令,尝试获取锁,如果获取成功,则设置锁的最终超时时间(以防在当前进程获取锁后奔溃导致锁无法释放);如果获取锁失败,则检查当前的锁是否超时,如果发现没有超时,则获取锁失败;如果发现锁已经超时(即锁的超时时间小于等于当前时间),则再次尝试获取锁,取到后判断下当前的超时时间和之前的超时时间是否相等,如果相等则说明当前的客户端是排队等待的线程里的第一个尝试获取锁的,让它获取成功即可。
public class RedisDistributionLock {
private static final Logger logger = LoggerFactory.getLogger(RedisDistributionLock.class);
//key的TTL,一天
private static final int finalDefaultTTLwithKey = 24 * 3600;
//锁默认超时时间,20秒
private static final long defaultExpireTime = 20 * 1000;
private static final boolean Success = true;
@Resource( name = "redisTemplate")
private RedisTemplate<String, String> redisTemplateForGeneralize;
/**
* 加锁,锁默认超时时间20秒
* @param resource
* @return
*/
public boolean lock(String resource) {
return this.lock(resource, defaultExpireTime);
}
/**
* 加锁,同时设置锁超时时间
* @param key 分布式锁的key
* @param expireTime 单位是ms
* @return
*/
public boolean lock(String key, long expireTime) {
logger.debug("redis lock debug, start. key:[{}], expireTime:[{}]",key,expireTime);
long now = Instant.now().toEpochMilli();
long lockExpireTime = now + expireTime;
//setnx
boolean executeResult = redisTemplateForGeneralize.opsForValue().setIfAbsent(key,String.valueOf(lockExpireTime));
logger.debug("redis lock debug, setnx. key:[{}], expireTime:[{}], executeResult:[{}]", key, expireTime,executeResult);
//取锁成功,为key设置expire
if (executeResult == Success) {
redisTemplateForGeneralize.expire(key,finalDefaultTTLwithKey, TimeUnit.SECONDS);
return true;
}
//没有取到锁,继续流程
else{
Object valueFromRedis = this.getKeyWithRetry(key, 3);
// 避免获取锁失败,同时对方释放锁后,造成NPE
if (valueFromRedis != null) {
//已存在的锁超时时间
long oldExpireTime = Long.parseLong((String)valueFromRedis);
logger.debug("redis lock debug, key already seted. key:[{}], oldExpireTime:[{}]",key,oldExpireTime);
//锁过期时间小于当前时间,锁已经超时,重新取锁
if (oldExpireTime <= now) {
logger.debug("redis lock debug, lock time expired. key:[{}], oldExpireTime:[{}], now:[{}]", key, oldExpireTime, now);
String valueFromRedis2 = redisTemplateForGeneralize.opsForValue().getAndSet(key, String.valueOf(lockExpireTime));
long currentExpireTime = Long.parseLong(valueFromRedis2);
//判断currentExpireTime与oldExpireTime是否相等
if(currentExpireTime == oldExpireTime){
//相等,则取锁成功
logger.debug("redis lock debug, getSet. key:[{}], currentExpireTime:[{}], oldExpireTime:[{}], lockExpireTime:[{}]", key, currentExpireTime, oldExpireTime, lockExpireTime);
redisTemplateForGeneralize.expire(key, finalDefaultTTLwithKey, TimeUnit.SECONDS);
return true;
}else{
//不相等,取锁失败
return false;
}
}
}
else {
logger.warn("redis lock,lock have been release. key:[{}]", key);
return false;
}
}
return false;
}
private Object getKeyWithRetry(String key, int retryTimes) {
int failTime = 0;
while (failTime < retryTimes) {
try {
return redisTemplateForGeneralize.opsForValue().get(key);
} catch (Exception e) {
failTime++;
if (failTime >= retryTimes) {
throw e;
}
}
}
return null;
}
/**
* 解锁
* @param key
* @return
*/
public boolean unlock(String key) {
logger.debug("redis unlock debug, start. resource:[{}].",key);
redisTemplateForGeneralize.delete(key);
return Success;
}
}
自定义注解使用分布式锁
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockAnnoation {
String keyPrefix() default "";
/**
* 要锁定的key中包含的属性
*/
String[] keys() default {};
/**
* 是否阻塞锁;
* 1. true:获取不到锁,阻塞一定时间;
* 2. false:获取不到锁,立即返回
*/
boolean isSpin() default true;
/**
* 超时时间
*/
int expireTime() default 10000;
/**
* 等待时间
*/
int waitTime() default 50;
/**
* 获取不到锁的等待时间
*/
int retryTimes() default 20;
}
实现分布式锁的逻辑
@Component
@Aspect
public class RedisLockAdvice {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAdvice.class);
@Resource
private RedisDistributionLock redisDistributionLock;
@Around("@annotation(RedisLockAnnoation)")
public Object processAround(ProceedingJoinPoint pjp) throws Throwable {
//获取方法上的注解对象
String methodName = pjp.getSignature().getName();
Class<?> classTarget = pjp.getTarget().getClass();
Class<?>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes();
Method objMethod = classTarget.getMethod(methodName, par);
RedisLockAnnoation redisLockAnnoation = objMethod.getDeclaredAnnotation(RedisLockAnnoation.class);
//拼装分布式锁的key
String[] keys = redisLockAnnoation.keys();
Object[] args = pjp.getArgs();
Object arg = args[0];
StringBuilder temp = new StringBuilder();
temp.append(redisLockAnnoation.keyPrefix());
for (String key : keys) {
String getMethod = "get" + StringUtils.capitalize(key);
temp.append(MethodUtils.invokeExactMethod(arg, getMethod)).append("_");
}
String redisKey = StringUtils.removeEnd(temp.toString(), "_");
//执行分布式锁的逻辑
if (redisLockAnnoation.isSpin()) {
//阻塞锁
int lockRetryTime = 0;
try {
while (!redisDistributionLock.lock(redisKey, redisLockAnnoation.expireTime())) {
if (lockRetryTime++ > redisLockAnnoation.retryTimes()) {
logger.error("lock exception. key:{}, lockRetryTime:{}", redisKey, lockRetryTime);
throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
}
ThreadUtil.holdXms(redisLockAnnoation.waitTime());
}
return pjp.proceed();
} finally {
redisDistributionLock.unlock(redisKey);
}
} else {
//非阻塞锁
try {
if (!redisDistributionLock.lock(redisKey)) {
logger.error("lock exception. key:{}", redisKey);
throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
}
return pjp.proceed();
} finally {
redisDistributionLock.unlock(redisKey);
}
}
}
}