date
Jul 2, 2022
type
Page
status
Invisible
slug
mall-distributed-lock
summary
概要:1 分布式锁原理;2 常用分布式锁实现方案;
tags
微服务
spring-cloud-alibaba
商城
category
商城实战
password
Property
Sep 27, 2022 02:57 AM
icon

分布式锁

一、分布式锁的原理

分布式锁或者本地锁的本质其实是一样的,都是将并行的操作转换为了串行的操作
notion image

二、分布式锁的常用解决方案

1 数据库

可以利用MySQL隔离性:唯一索引
use test; CREATE TABLE `DistributedLock` (  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',  `name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁名',  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',  PRIMARY KEY (`id`),  UNIQUE KEY `uidx_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法'; //数据库中的每一条记录就是一把锁,利用的mysql唯一索引的排他性 lock(name,desc){    insert into DistributedLock(`name`,`desc`) values (#{name},#{desc}); } unlock(name){    delete from DistributedLock where name = #{name} }
可以利用拍他说来实现 select .... where ... for update;
乐观锁:乐观的任务数据不会出现数据安全问题,如果出现了就重试一次
select ...,version; update table set version+1 where version = xxx

2 Redis

setNX: setNX(key,value) :如果key不存在那么就添加key的值,否则添加失败,Redisson

3 Zookeeper

notion image

三、Redis实现分布式锁

在Redis中是通过setNX指令来实现锁的抢占,那么利用这个命令实现分布式锁的基础代码为:
   public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {        String keys = "catalogJSON";        // 加锁        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");        if(lock){            // 加锁成功            Map<String, List<Catalog2VO>> data = getDataForDB(keys);            // 从数据库中获取数据成功后,我们应该要释放锁            stringRedisTemplate.delete("lock");            return data;       }else{            // 加锁失败            // 休眠+重试            // Thread.sleep(1000);            return getCatelog2JSONDbWithRedisLock();       }   }
上面的代码其实是存在一些问题的,首先如果getDataForDB(keys)这个方法如果出现的异常,那么我们就不会删除该key也就是不会释放锁,从而造成了死锁,针对这个问题,我们可以通过设置过期时间来解决,具体代码如下:
   public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {        String keys = "catalogJSON";        // 加锁        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");        if(lock){            // 给对应的key设置过期时间            stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);            // 加锁成功            Map<String, List<Catalog2VO>> data = getDataForDB(keys);            // 从数据库中获取数据成功后,我们应该要释放锁            stringRedisTemplate.delete("lock");            return data;       }else{            // 加锁失败            // 休眠+重试            // Thread.sleep(1000);            return getCatelog2JSONDbWithRedisLock();       }   }
上面虽然解决了getDataForDB方法出现异常的问题,但是如果在expire方法执行之前就中断呢?这样也会出现我们介绍的死锁的问题,那这个问题怎么办?这时我们就希望setNx和设置过期时间的操作能够保证原子性。
这时我们就可以在setIfAbsent方法中同时指定过期时间,保证这个原子性的行为
   public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {        String keys = "catalogJSON";        // 加锁 在执行插入操作的同时设置了过期时间        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",30,TimeUnit.SECONDS);        if(lock){            // 给对应的key设置过期时间            stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);            // 加锁成功            Map<String, List<Catalog2VO>> data = getDataForDB(keys);            // 从数据库中获取数据成功后,我们应该要释放锁            stringRedisTemplate.delete("lock");            return data;       }else{            // 加锁失败            // 休眠+重试            // Thread.sleep(1000);            return getCatelog2JSONDbWithRedisLock();       }   }
如果获取锁的业务执行时间比较长,超过了我们设置的过期时间,那么就有可能业务还没执行完,锁就释放了,然后另一个请求进来了,并创建了key,这时原来的业务处理完成后,再去删除key的时候,那么就有可能删除别人的key,这时怎么办?针对这种情况我们可以查询的锁的信息通过UUID来区分,具体的代码如下:
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {        String keys = "catalogJSON";        // 加锁 在执行插入操作的同时设置了过期时间        String uuid = UUID.randomUUID().toString();        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);        if(lock){            // 给对应的key设置过期时间            stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);            // 加锁成功            Map<String, List<Catalog2VO>> data = getDataForDB(keys);            // 获取当前key对应的值            String val = stringRedisTemplate.opsForValue().get("lock");            if(uuid.equals(val)){                // 说明这把锁是自己的                // 从数据库中获取数据成功后,我们应该要释放锁                stringRedisTemplate.delete("lock");           }            return data;       }else{            // 加锁失败            // 休眠+重试            // Thread.sleep(1000);            return getCatelog2JSONDbWithRedisLock();       }   }
上面查询key的值和删除key其实不是一个原子性操作,这就会出现我查询出来key之后,时间过期了,然后key被删除了,然后其他的请求创建了一个新的key,然后原来的执行删除了这个key,又出现了删除别人key的情况。这时我们需要保证查询和删除是一个原子性行为。
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {        String keys = "catalogJSON";        // 加锁 在执行插入操作的同时设置了过期时间        String uuid = UUID.randomUUID().toString();        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);        if(lock){            Map<String, List<Catalog2VO>> data = null;            try {                // 加锁成功                data = getDataForDB(keys);           }finally {                String srcipts = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end ";                // 通过Redis的lua脚本实现 查询和删除操作的原子性                stringRedisTemplate.execute(new DefaultRedisScript<Integer>(srcipts,Integer.class)                       ,Arrays.asList("lock"),uuid);           }            return data;       }else{            // 加锁失败            // 休眠+重试            // Thread.sleep(1000);            return getCatelog2JSONDbWithRedisLock();       }   }

四、Redisson实现分布式锁