分布式锁(三)——基于redis的分布式锁实例

之前介绍分布式锁的时候,给过一个漫画介绍分布式锁的链接,传送门——漫画说明分布式锁这篇总结写的确实不错。但是有些地方我们可以按照其他思路来进行实现。

这里先需要了解一下各种失败机制,failover,failsafe,failfast,failback,forking。

整体介绍

如果要实现一个锁的机制,无非就是加锁,释放锁,已经其他异常场景下的几种处理。在redis下如何实现这些操作还是值得探讨的。

加锁

老版本的redis可以通过setnx命令来实现分布式锁(本篇博客也会采用这种方式)setnx命令文档,这个命令在key已经存在的情况下会返回0,在key不存在的情况下才会返回1,至于key所对应的value,这个根据业务随意就好。

解锁

解锁就很简单了,直接del指定的key值就可以了

异常场景的处理

锁超时

如果一个得到锁的线程在执行任务的过程中遇到了异常,来不及显示的释放分布式锁,则这个资源就会被永远锁住,这个是要处理的。可以给redis中指定的key设置一个过期时间,就可以解决这一问题,不过个人觉得设置一个标志位似乎更加合理一点

在高一点的redis版本中可以通过set命令设置过期时间

误删

在设置了过期时间之后,如果一个进程A执行很慢,在规定时间中没有顺利执行完指定的业务逻辑代码,锁就被释放了,之后进程B获得了锁(这个锁key值相同,但是value值不同)。进程B执行一段时间之后,进程A如果没有做value值的判断,则会删除进程B的分布式锁。因此在释放锁的时候,需要判断一下value值

这里只是简单的列举了两个问题,造成这些问题的原因无非就一个——加锁/解锁+业务操作的原子性

简单版本

先上一个简单的版本,我们利用setNx获取锁。

1 /* 2 * 基于redis的分布式锁 3 * @param productLockDto 4 * @return 5 */ 6@Transactional(rollbackFor = Exception.class) 7public int updateStockRedisLock(ProductLockDto productLockDto){ 8 int result = 0; 9 final String key = String.format("redis_lock_product_id:%s",productLockDto.getId()); 10 String value = UUID.randomUUID().toString()+System.nanoTime(); 11 //利用setNx操作建立锁。 12 Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key,value); 13 if(res){ 14 //开始核心的业务逻辑处理 15 ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId()); 16 if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){ 17 productLockEntity.setStock(productLockDto.getStock()); 18 //与普通的更新操作相比,仅仅是增加了版本号的统计 19 result = lockMapper.updateStockForNegative(productLockEntity); 20 21 if(result>0){ 22 log.info("通过redis的分布式锁更新成功,剩余库存stock={}",productLockDto.getStock()); 23 } 24 } 25 } 26 return result; 27} 28 29

但是比较恶心的是,这里没有释放锁的操作,因此只能获取一次锁,然后操作一次,之后并没有释放锁,导致系统只能更新一次数据。日志如下所示:
在这里插入图片描述
数据库中也只是显示更新一次
在这里插入图片描述

正确释放锁

上述的代码中没有释放锁的操作,导致是有一个进程能对数据进行操作,但是在整体介绍部分说过,在释放锁之前需要对比一下redis锁的key值,避免出现进程A删除进程B对应的分布式锁的key值。

1/* 2* 基于redis的分布式锁 3 * @param productLockDto 4 * @return 5 */ 6@Transactional(rollbackFor = Exception.class) 7public int updateStockRedisLock(ProductLockDto productLockDto){ 8 int result = 0; 9 final String key = String.format("redis_lock_product_id:%s",productLockDto.getId()); 10 String value = UUID.randomUUID().toString()+System.nanoTime(); 11 //利用setNx操作建立锁。 12 Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(key,value); 13 try{ 14 if(res){ 15 //真正的更新操作 16 ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId()); 17 if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){ 18 productLockEntity.setStock(productLockDto.getStock()); 19 //与普通的更新操作相比,仅仅是增加了版本号的统计 20 result = lockMapper.updateStockForNegative(productLockEntity); 21 22 if(result>0){ 23 log.info("通过redis的分布式锁更新成功,剩余库存stock={}",productLockEntity.getStock()); 24 } 25 } 26 } 27 //失败了就失败了,这里什么都没做。 28 }catch (Exception e){ 29 log.error("出现异常,异常信息为:{}",e.fillInStackTrace()); 30 }finally {//无论如何,这里都需要释放锁。 31 String redisValue = stringRedisTemplate.opsForValue().get(key); 32 if(value.equals(redisValue)){//之前分析过,为了避免出现进程A删掉进程B的Key,这里需要做一个判断 33 stringRedisTemplate.delete(key); 34 } 35 } 36 return result; 37} 38 39

引入了try-finally语句块,这样就能保证在任何时候都能释放锁,在释放锁的时候进行了一个必要的判断,这样能正确处理数据。同样用jmeter模拟2000个线程之后,数据如下。
在这里插入图片描述
锁正确释放之后,多个进程能进行数据操作。
在这里插入图片描述

2000个进程,最终只是500多个进程抢到了锁,因为我们采取了failover的机制。有些业务场景中,我们需要让这些获取锁失败的进程进行一个轮询,然后再次加入锁的竞争中。

轮询获取锁

先直接上实例吧

1/* 2 * 基于redis的分布式锁 3 * 4 * @param productLockDto 5 * @return 6 */ 7@Transactional(rollbackFor = Exception.class) 8public int updateStockRedisLock(ProductLockDto productLockDto) { 9 int result = 0; 10 final String key = String.format("redis_lock_product_id:%s", productLockDto.getId()); 11 Boolean res = true;//利用一个标志位。根据标志位不断轮询判断 12 while (res) { 13 String value = UUID.randomUUID().toString() + System.nanoTime(); 14 //利用setNx操作建立锁。 15 res = stringRedisTemplate.opsForValue().setIfAbsent(key, value); 16 if (res) { 17 try { 18 res = false;//将标志位置为false,为了后续跳出循环 19 //真正的更新操作 20 ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId()); 21 int recordStock = productLockEntity.getStock(); 22 if (productLockEntity != null && productLockEntity.getStock().compareTo(productLockDto.getStock()) >= 0) { 23 productLockEntity.setStock(productLockDto.getStock()); 24 //与普通的更新操作相比,仅仅是增加了版本号的统计 25 result = lockMapper.updateStockForNegative(productLockEntity); 26 27 if (result > 0) { 28 log.info("通过redis的分布式锁更新成功,剩余库存stock={}", recordStock - 1); 29 } 30 } 31 } catch (Exception e) { 32 log.error("出现异常,异常信息为:{}", e.fillInStackTrace()); 33 } finally { 34 String redisValue = stringRedisTemplate.opsForValue().get(key); 35 if (value.equals(redisValue)) {//之前分析过,为了避免出现进程A删掉进程B的Key,这里需要做一个判断 36 stringRedisTemplate.delete(key); 37 } 38 } 39 }else{ 40 res=true;//让获取分布式锁失败的线程继续获取锁。 41 } 42 } 43 return result; 44} 45 46

这个版本中提现了轮询的思想,这个在JDK中可重入锁的源码中有很多体现。这里不再赘述,具体数据结果如下(2000个线程,需要适当扩大数据库连接池的配置)
在这里插入图片描述
可以看到每个请求均正常获取到相关数据。模拟的2000个线程均正常更新了数据。

总结

本篇博客简单总结了redis中分布式锁的实践,篇实战,没啥可总结的。

代码交流 2021