12简单分布式锁
在分布式系统中,很多操作是需要用到分布式锁,防止并发操作带来一些问题;因为redis是独立于分布式系统外的其他服务,因此就可以利用redis,来实现一个简单的不完美分布式锁;
功能所需命令
- SETNX key不存在,设置;key存在,不设置
-
# 加锁 127.0.0.1:6379> SETNX try_lock 1 (integer) 1 # 释放锁 127.0.0.1:6379> del try_lock (integer) 1
- set key value [ex seconds] [nx | xx]
- 上面的方式,虽然能够加锁,但是不难发现,很容易出现死锁的情况;比如,a用户在加锁之后,突然系统挂了,此时a就永远不会释放他持有的锁了,从而导致死锁;为此,我们可以利用redis的过期时间来防止死锁问题
-
set try_lock 1 ex 5 nx
不完美的锁
上面的方案,虽然解决了死锁的问题,但是又带来了一个新的问题,执行时间如果长于自动释放的时间(比如自动释放是5秒,但是业务执行耗时了8秒),那么在第5秒的时候,锁就自动释放了,此时其他的线程就能正常拿到锁,简单流程如下:
此时相同颜色部分的时间区间是由多线程同时在执行。而且此问题在此方案下并没有完美的解决方案,只能做到尽可能的避免:
-
方式一,value设置为随机数(如:1234),在程序释放锁的时候,检测一下是不是自己加的锁;比如,A线程在第8s释放的锁就是线程B加的,此时在释放的时候,就可以检验一下value是不是自己当初设置的值(1234),是的就释放,不是的就不管了; -
方式二,只在时间消耗比较小的业务上选用此方案,尽可能的避免执行时间超过锁的自动释放时间
13、认识的人/好友推荐
在支付宝、抖音、QQ等应用中,都会看到好友推荐;
好友推荐往往都是基于你的好友关系网来推荐,将你可能认识的人推荐给你,让你去添加好友,如果随意在系统找个人推荐给你,那你认识的可能性是非常小的,此时就失去了推荐的目的;
比如,A和B是好友,B和C是好友,此时A和C认识的概率是比较大的,就可以在A和C之间的好友推荐;
基于这个逻辑,就可以利用 Redis 的 Set 集合,缓存各个用户的好友列表,然后以差集的方式,来实现好友推荐;
功能所需的命令
-
SADD key member [member …]:集合中添加元素,缓存好友列表 -
SDIFF key [key …]:取两个集合间的差集,找出可以推荐的用户
Redis-cli 客户端测试
# 记录 用户1 的好友列表 127.0.0.1:6379> SADD fl:user1 user2 user3 (integer) 2 # 记录 用户2 的好友列表 127.0.0.1:6379> SADD fl:user2 user1 user4 (integer) 2 # 用户1 可能认识的人 ,把自己(user1)去掉,user4是可能认识的人 127.0.0.1:6379> SDIFF fl:user2 fl:user1 1) "user1" 2) "user4" # 用户2 可能认识的人 ,把自己(user2)去掉,user3是可能认识的人 127.0.0.1:6379> SDIFF fl:user1 fl:user2 1) "user3" 2) "user2"
不过这只是推荐机制中的一种因素,可以借助其他条件,来增强推荐的准确度;
14、发布/订阅
发布/订阅是比较常用的一种模式;在分布式系统中,如果需要实时感知到一些变化,比如:某些配置发生变化需要实时同步,就可以用到发布,订阅功能
常用API
-
PUBLISH channel message:将消息推送到指定的频道 -
SUBSCRIBE channel [channel …]:订阅给定的一个或多个频道的信息
Redis-cli操作
如上图所示,左侧多个客户端订阅了频道,当右侧客户端往频道发送消息的时候,左侧客户端都能收到对应的消息。
15、消息队列
说到消息队列,常用的就是Kafka、RabbitMQ等等,其实 Redis 利用 List 也能实现一个消息队列;
功能所需的指令
-
RPUSH key value1 [value2]:在列表中添加一个或多个值; -
BLPOP key1 [key2] timeout:移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止; -
BRPOP key1 [key2] timeout:移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
依赖调整:
Spring Boot 从 2.0版本开始,将默认的Redis客户端Jedis替换为Lettuce,在测试这块阻塞的时候,会出现一个超时的异常io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
;没有找到一个好的解决方式,所以这里将 Lettuce 换回成 Jedis ,就没有问题了,pom.xml 的配置如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </exclusion> <exclusion> <artifactId>lettuce-core</artifactId> <groupId>io.lettuce</groupId> </exclusion> </exclusions> </dependency> <!-- jedis客户端 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!-- spring2.X集成redis所需common-pool2,使用jedis必须依赖它--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
测试代码:
import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; /** * @author 一行Java * @title: QueueTest * @projectName ehang-spring-boot * @description: TODO * @date 2022/8/5 14:27 */ @SpringBootTest @Slf4j public class QueueTest { private static final String REDIS_LP_QUEUE = "redis:lp:queue"; private static final String REDIS_RP_QUEUE = "redis:rp:queue"; @Autowired StringRedisTemplate stringRedisTemplate; /** * 先进后出队列 */ @Test public void rightMonitor() { while (true) { Object o = stringRedisTemplate.opsForList().rightPop(REDIS_LP_QUEUE, 0, TimeUnit.SECONDS); log.info("先进后出队列 接收到数据:{}", o); } } /** * 先进先出队列 */ @Test public void leftMonitor() { while (true) { Object o = stringRedisTemplate.opsForList().leftPop(REDIS_RP_QUEUE, 0, TimeUnit.SECONDS); log.info("先进先出队列 接收到数据:{}", o); } } }
不过,对消息的可靠性要求比较高的场景,建议还是使用专业的消息队列框架,当值被弹出之后,List 中就已经不存在对应的值了,假如此时程序崩溃,就会出现消息的丢失,无法保证可靠性;虽然说也有策略能够保证消息的可靠性,比如,在弹出的同时,将其保存到另外一个队列(BRPOPLPUSH),成功之后,再从另外的队列中移除,当消息处理失败或者异常,再重新进入队列执行,只是这样做就得不偿失了。
文章评论