redis setnx 分布式锁_Spring Boot 整合 Redis 正确的实现分布式锁

前言

最近在做分块上传的业务,使用到了Redis来维护上传过程中的分块编号。

每上传完成一个分块就获取一下文件的分块集合,加入新上传的编号,手动接口测试下是没有问题的,前端通过并发上传调用就出现问题了,并发的get再set,就会存在覆盖写现象,导致最后的分块数据不对,不能触发分块合并请求。

遇到并发二话不说先上锁,针对执行代码块加了一个JVM锁之后问题就解决了。

仔细一想还是不太对,项目是分布式部署的,做了负载均衡,一个节点的代码被锁住了,请求轮询到其他节点还是可以进行覆盖写,并没有解决到问题啊

没办法,只有用上分布式锁了。之前对于分布式锁的理论还是很熟悉的,没有比较好的应用场景就没写过具体代码,趁这个机会就学习使用一下分布式锁。

理论

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。是为了解决分布式系统中,不同的系统或是同一个系统的不同主机共享同一个资源的问题,它通常会采用互斥来保证程序的一致性

f6234123625adbdba5d097daca8f5cfa.png

通常的实现方式有三种:

基于 MySQL 的悲观锁来实现分布式锁,这种方式使用的最少,这种实现方式的性能不好,且容易造成死锁,并且MySQL本来业务压力就很大了,再做锁也不太合适

基于 Redis 实现分布式锁,单机版可用setnx实现,多机版建议用Radission

基于 ZooKeeper 实现分布式锁,利用 ZooKeeper 顺序临时节点来实现

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

互斥性。在任意时刻,只有一个客户端能持有锁。

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

本文就使用的是Redis的setnx实现,如果Redis是多机版的可以去了解下Radssion,封装的就特别的好,也是官方推荐的

代码

  1. 加依赖

引入Spring Boot和Redis整合的快速使用依赖

1<dependency> 2            <groupId>org.springframework.bootgroupId> 3            <artifactId>spring-boot-starter-data-redisartifactId> 4  dependency> 5
  1. 加配置

application.properties中加入Redis连接相关配置

1spring.redis.host=xxx 2spring.redis.port=6379 3spring.redis.database=0 4spring.redis.password=xxx 5spring.redis.timeout=10000 6# 设置jedis连接池 7spring.redis.jedis.pool.max-active=50 8spring.redis.jedis.pool.min-idle=20 9
  1. 重写Redis的序列化规则

默认使用的JDK的序列化,不自己设置一下Redis中的数据是看不懂的

1/** 2 * @author Chkl 3 * @create 2020/6/7 4 * @since 1.0.0 5 */ 6@Component 7public class RedisConfig { 8    /** 9     * 改造RedisTemplate,重写序列化规则,避免存入序列化内容看不懂 10     * @param connectionFactory 11     * @return 12     */ 13    @Bean 14    public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { 15        RedisTemplate redisTemplate = new RedisTemplate(); 16        redisTemplate.setConnectionFactory(connectionFactory); 17        // 设置key和value的序列化规则 18        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); 19        redisTemplate.setKeySerializer(new StringRedisSerializer()); 20        return redisTemplate; 21    } 22} 23
  1. 如何正确的上锁

直接上代码

1@Component 2public class RedisLock { 3    @Autowired 4    private StringRedisTemplate redisTemplate; 5    private long timeout = 3000; 6    /** 7     * 上锁 8     * 9     * @param key 锁标识 10     * @param value 线程标识 11     * @return 上锁状态 12     */ 13    public boolean lock(String key, String value) { 14         15        //执行set命令 16        Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);//1 17        //是否成功获取锁 18        if (absent) { 19            return true; 20        } 21        return false; 22    } 23} 24

核心代码就是

1Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); 2

setIfAbsent方法就相当于命令行下的Setnx方法,指定的 key 不存在时,为 key 设置指定的值

参数分别是key、value、超时时间和时间单位

key,表示针对于这段资源的唯一标识

value,表示针对于这个线程的唯一标识。为什么有了key了还需要设置value呢,就是为了满足四个条件的最后一个:解铃还须系铃人。只有通过key和value的组合才能保证解锁时是同一个线程来解锁

超时时间,必须和setnx一起进行操作,不能再setnx结束后再执行。如果加锁成功了,还没有设置过期时间就宕机了,锁就永远不会过期,变成死锁

  1. 如何正确解锁

1@Component 2public class RedisLock { 3    @Autowired 4    private StringRedisTemplate redisTemplate; 5    @Autowired 6    private DefaultRedisScript redisScript;private static final Long RELEASE_SUCCESS = 1L;/** 7     * 解锁 8     * @param key 锁标识 9     * @param value 线程标识 10     * @return 解锁状态 11     */public boolean unlock(String key, String value) {//使用Lua脚本:先判断是否是自己设置的锁,再执行删除 12        Long result = redisTemplate.execute(redisScript, Arrays.asList(key,value));//返回最终结果return RELEASE_SUCCESS.equals(result); 13    }/** 14     * @return lua脚本 15     */@Beanpublic DefaultRedisScriptdefaultRedisScript() { 16        DefaultRedisScript defaultRedisScript = new DefaultRedisScript<>(); 17        defaultRedisScript.setResultType(Long.class); 18        defaultRedisScript.setScriptText("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end");return defaultRedisScript; 19    } 20} 21

解锁过程需要两步操作

判断操作线程是否是加锁的线程

如果是加锁线程,执行解锁操作

这两步操作也需要原子的进行操作,但是Redis不支持这两步的合并的操作,所以,就只有使用lua脚本实现来保证原子性咯

如果在判断是加锁的线程之后,并且执行解锁之前,锁到期了,被其他线程获得锁了,这时候再进行解锁就会解掉其他线程的锁,使得不满足解铃还须系铃人

  1. 实际应用

没有使用分布式锁时的保存文件分块的代码

1/** 2     * 保存文件分块编号到redis 3     * @param chunkNumber 分块号 4     * @param identifier 文件唯一编号 5     * @return 文件分块的大小 6     */ 7    @Override 8    public Integer saveChunk(Integer chunkNumber, String identifier) { 9      //从Redis获取已经存在的分块编号集合 10        Set oldChunkNumber = (Set) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),Set.class);//如果不存在分块集合,创建一个集合if (Objects.isNull(oldChunkNumber)) {Set newChunkNumber = new HashSet<>(); 11            newChunkNumber.add(chunkNumber); 12            redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(newChunkNumber),36000);return newChunkNumber.size();//如果分块集合已经存在了,就添加一个编号 13        } else { 14            oldChunkNumber.add(chunkNumber); 15            redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(oldChunkNumber),36000);return oldChunkNumber.size(); 16        } 17    } 18

存在的问题是:当并发的请求进来之后,可能获取同一个状态的集合进行修改,修改后直接写入,造成同一个状态获得的集合操作线程覆盖写的现象

使用分布式锁保证同时只能有一个线程能获取到集合并进行修改,避免了覆盖写现象

使用分布式锁代码

1/** 2     * 保存文件分块编号到redis 3     * @param chunkNumber 分块号 4     * @param identifier 文件唯一编号 5     * @return 文件分块的大小 6     */ 7    @Override 8    public Integer saveChunk(Integer chunkNumber, String identifier) { 9      //通过UUID生成一个请求线程识别标志作为锁的value 10        String threadUUID = CoreUtil.getUUID(); 11        //上锁,以共享资源标识:文件唯一编号,作为key,以线程标识UUID作为value 12        redisLock.lock(identifier,threadUUID); 13        //从Redis获取已经存在的分块编号集合 14        Set oldChunkNumber = (Set) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),Set.class);//如果不存在分块集合,创建一个集合if (Objects.isNull(oldChunkNumber)) {Set newChunkNumber = new HashSet<>(); 15            newChunkNumber.add(chunkNumber); 16            redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(newChunkNumber),36000);//解锁 17            redisLock.unlock(identifier,threadUUID);return newChunkNumber.size();//如果分块集合已经存在了,就添加一个编号 18        } else { 19            oldChunkNumber.add(chunkNumber); 20            redisOperator.set("chunkNumberList_"+identifier, JSON.toJSONString(oldChunkNumber),36000);//解锁 21            redisLock.unlock(identifier,threadUUID);return oldChunkNumber.size(); 22        } 23    } 24

代码中使用的共享资源标识是文件唯一编号identifier,它能标识加锁代码段中的唯一资源,即key为"chunkNumberList_"+identifier的集合

代码中使用的线程唯一标识是UUID,能保证加锁和解锁时获取的标识不会重复

————————————————

版权声明:本文为CSDN博主「每天都有新收获」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:

https://blog.csdn.net/qq_41170102/article/details/107007576

代码交流 2021