发布时间:2023-11-08 13:30
在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁。
在分布式架构中,我们同样会遇到数据共享操作问题,本文章使用Redis
来解决分布式架构中的数据一致性问题。
单机数据一致性架构如下图所示:多个可客户访问同一个服务器,连接同一个数据库。
场景描述:客户端模拟购买商品过程,在Redis
中设定库存总数剩100个
,多个客户端同时并发购买。
@RestController public class IndexController1 { @Autowired StringRedisTemplate template; @RequestMapping(\"/buy1\") public String index(){ // Redis中存有goods:001号商品,数量为100 String result = template.opsForValue().get(\"goods:001\"); // 获取到剩余商品数 int total = result == null ? 0 : Integer.parseInt(result); if( total > 0 ){ // 剩余商品数大于0 ,则进行扣减 int realTotal = total -1; // 将商品数回写数据库 template.opsForValue().set(\"goods:001\",String.valueOf(realTotal)); System.out.println(\"购买商品成功,库存还剩:\"+realTotal +\"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\"+realTotal +\"件, 服务端口为8001\"; }else{ System.out.println(\"购买商品失败,服务端口为8001\"); } return \"购买商品失败,服务端口为8001\"; } }
使用Jmeter
模拟高并发场景,测试结果如下:
测试结果出现多个用户购买同一商品,发生了数据不一致问题!
解决办法:单体应用的情况下,对并发的操作进行加锁操作,保证对数据的操作具有原子性
synchronized
ReentrantLock
@RestController public class IndexController2 { // 使用ReentrantLock锁解决单体应用的并发问题 Lock lock = new ReentrantLock(); @Autowired StringRedisTemplate template; @RequestMapping(\"/buy2\") public String index() { lock.lock(); try { String result = template.opsForValue().get(\"goods:001\"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set(\"goods:001\", String.valueOf(realTotal)); System.out.println(\"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"; } else { System.out.println(\"购买商品失败,服务端口为8001\"); } } catch (Exception e) { lock.unlock(); } finally { lock.unlock(); } return \"购买商品失败,服务端口为8001\"; } }
上面解决了单体应用的数据一致性问题,但如果是分布式架构部署呢,架构如下:
提供两个服务,端口分别为8001
、8002
,连接同一个Redis
服务,在服务前面有一台Nginx
作为负载均衡
两台服务代码相同,只是端口不同
将8001
、8002
两个服务启动,每个服务依然用ReentrantLock
加锁,用Jmeter
做并发测试,发现会出现数据一致性问题!
取消单机锁,下面使用redis
的set
命令来实现分布式加锁
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
@RestController public class IndexController4 { // Redis分布式锁的key public static final String REDIS_LOCK = \"good_lock\"; @Autowired StringRedisTemplate template; @RequestMapping(\"/buy4\") public String index(){ // 每个人进来先要进行加锁,key值为\"good_lock\",value随机生成 String value = UUID.randomUUID().toString().replace(\"-\",\"\"); try{ // 加锁 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value); // 加锁失败 if(!flag){ return \"抢锁失败!\"; } System.out.println( value+ \" 抢锁成功\"); String result = template.opsForValue().get(\"goods:001\"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set(\"goods:001\", String.valueOf(realTotal)); // 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放, // 释放锁操作不能在此操作,要在finally处理 // template.delete(REDIS_LOCK); System.out.println(\"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"; } else { System.out.println(\"购买商品失败,服务端口为8001\"); } return \"购买商品失败,服务端口为8001\"; }finally { // 释放锁 template.delete(REDIS_LOCK); } } }
上面的代码,可以解决分布式架构中数据一致性问题。但再仔细想想,还是会有问题,下面进行改进。
在上面的代码中,如果程序在运行期间,部署了微服务jar
包的机器突然挂了,代码层面根本就没有走到finally
代码块,也就是说在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁
所以,这里需要对这个key
加一个过期时间,Redis
中设置过期时间有两种方法:
template.expire(REDIS_LOCK,10, TimeUnit.SECONDS)
template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS)
第一种方法需要单独的一行代码,且并没有与加锁放在同一步操作,所以不具备原子性,也会出问题
第二种方法在加锁的同时就进行了设置过期时间,所有没有问题,这里采用这种方式
调整下代码,在加锁的同时,设置过期时间:
// 为key加一个过期时间,其余代码不变 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK,value,10L,TimeUnit.SECONDS);
这种方式解决了因服务突然宕机而无法释放锁的问题。但再仔细想想,还是会有问题,下面进行改进。
方式二设置了key
的过期时间,解决了key
无法删除的问题,但问题又来了
上面设置了key
的过期时间为10
秒,如果业务逻辑比较复杂,需要调用其他微服务,处理时间需要15
秒(模拟场
景,别较真),而当10
秒钟过去之后,这个key
就过期了,其他请求就又可以设置这个key
,此时如果耗时15
秒
的请求处理完了,回来继续执行程序,就会把别人设置的key
给删除了,这是个很严重的问题!
所以,谁上的锁,谁才能删除
@RestController public class IndexController6 { public static final String REDIS_LOCK = \"good_lock\"; @Autowired StringRedisTemplate template; @RequestMapping(\"/buy6\") public String index(){ // 每个人进来先要进行加锁,key值为\"good_lock\" String value = UUID.randomUUID().toString().replace(\"-\",\"\"); try{ // 为key加一个过期时间 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS); // 加锁失败 if(!flag){ return \"抢锁失败!\"; } System.out.println( value+ \" 抢锁成功\"); String result = template.opsForValue().get(\"goods:001\"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1; template.opsForValue().set(\"goods:001\", String.valueOf(realTotal)); System.out.println(\"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"; } else { System.out.println(\"购买商品失败,服务端口为8001\"); } return \"购买商品失败,服务端口为8001\"; }finally { // 谁加的锁,谁才能删除!!!! if(template.opsForValue().get(REDIS_LOCK).equals(value)){ template.delete(REDIS_LOCK); } } } }
这种方式解决了因服务处理时间太长而释放了别人锁的问题。这样就没问题了吗?
在上面方式三下,规定了谁上的锁,谁才能删除,但finally
快的判断和del
删除操作不是原子操作,并发的时候也会出问题,并发嘛,就是要保证数据的一致性,保证数据的一致性,最好要保证对数据的操作具有原子性。
在Redis
的set
命令介绍中,最后推荐Lua
脚本进行锁的删除,地址
@RestController public class IndexController7 { public static final String REDIS_LOCK = \"good_lock\"; @Autowired StringRedisTemplate template; @RequestMapping(\"/buy7\") public String index(){ // 每个人进来先要进行加锁,key值为\"good_lock\" String value = UUID.randomUUID().toString().replace(\"-\",\"\"); try{ // 为key加一个过期时间 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS); // 加锁失败 if(!flag){ return \"抢锁失败!\"; } System.out.println( value+ \" 抢锁成功\"); String result = template.opsForValue().get(\"goods:001\"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1; template.opsForValue().set(\"goods:001\", String.valueOf(realTotal)); System.out.println(\"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"; } else { System.out.println(\"购买商品失败,服务端口为8001\"); } return \"购买商品失败,服务端口为8001\"; }finally { // 谁加的锁,谁才能删除,使用Lua脚本,进行锁的删除 Jedis jedis = null; try{ jedis = RedisUtils.getJedis(); String script = \"if redis.call(\'get\',KEYS[1]) == ARGV[1] \" + \"then \" + \"return redis.call(\'del\',KEYS[1]) \" + \"else \" + \" return 0 \" + \"end\"; Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value)); if(\"1\".equals(eval.toString())){ System.out.println(\"-----del redis lock ok....\"); }else{ System.out.println(\"-----del redis lock error ....\"); } }catch (Exception e){ }finally { if(null != jedis){ jedis.close(); } } } } }
在方式四下,规定了谁上的锁,谁才能删除,并且解决了删除操作没有原子性问题。但还没有考虑缓存续命,以及Redis
集群部署下,异步复制造成的锁丢失:主节点没来得及把刚刚set
进来这条数据给从节点,就挂了。所以直接上RedLock
的Redisson
落地实现。
@RestController public class IndexController8 { public static final String REDIS_LOCK = \"good_lock\"; @Autowired StringRedisTemplate template; @Autowired Redisson redisson; @RequestMapping(\"/buy8\") public String index(){ RLock lock = redisson.getLock(REDIS_LOCK); lock.lock(); // 每个人进来先要进行加锁,key值为\"good_lock\" String value = UUID.randomUUID().toString().replace(\"-\",\"\"); try{ String result = template.opsForValue().get(\"goods:001\"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realTotal = total - 1; template.opsForValue().set(\"goods:001\", String.valueOf(realTotal)); System.out.println(\"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"); return \"购买商品成功,库存还剩:\" + realTotal + \"件, 服务端口为8001\"; } else { System.out.println(\"购买商品失败,服务端口为8001\"); } return \"购买商品失败,服务端口为8001\"; }finally { if(lock.isLocked() && lock.isHeldByCurrentThread()){ lock.unlock(); } } } }
分析问题的过程,也是解决问题的过程,也能锻炼自己编写代码时思考问题的方式和角度。
上述测试代码地址
以上就是Redis实现分布式锁的五种方法详解的详细内容,更多关于Redis分布式锁的资料请关注脚本之家其它相关文章!