Redis 延时队列

版权归原作者所有。 本文最后更新于:2023年12月5日 凌晨

Redis 延时队列

Redis消息队列并不想mq那样具有很多的特性,对于需要消息可靠性有极高保证的话,redis消息队列不适合,因为他没有ack保障,对于只有一组消费者的队列Redis可以很轻松的搞定。

异步消息队列

Redis的list(列表)数据结构常用来作为异步消息队列使用

利用rpush和lpush操作入队列,用rpop和lpop操作出队列

rpush->lpop
lpop->rpop

可以支持多个生产者、消费者并发处理,每个消费者拿到的是不同队列的元素

消息也是无序的

rpush k1 v1 v2 v3
llen k1
lpop k1
llen k1
lpop k1

rpush k2 v1 v2 v3
llen k2
lpop k2

队列空了怎么办?

如果队列空了,消费者还一直在轮询,就会陷入pop的死循环,不停的pop还没有数据,这就是浪费声明的空轮询,空轮询不但拉高了客户端 CPU消耗,Redis的QPS也会被拉高,如果空轮询客户端有几十个这样的,Redis的慢查询会显著很多,

通过我们使用具体Java中线程Sleep 1s来解决这个问题,这样客户端CPU下来了,Redis QPS也下来了

阻塞读

对于上面问题sleep 1s可以解决问题,但是又有一个小问题,那就是睡眠导致消息的延迟增大,如果只有1个消费者,延迟为1s,如果多个消费者,这个延迟会有所下降,因为每个消费者时间是岔开的

有什么办法可以显著降低延迟呢?

使用blpop 和 brpop 当没有消息时就进入休眠状态,一旦数据到来,则立刻醒过来,进行处理,消息的延迟几乎为零。替代lpop和rpop,完美解决上面问题。

空闲连接自动断开

上面的方案并不是完美的,还有空闲连接的问题

一直阻塞,Redis的客户端连接就成了闲置连接,如果服务端断开,blpop或者brpop就会抛出有异常,在客户端需要捕获异常之后重试。

实现方式

zset(有序列表)进行实现

把消息序列化为一个字符串作为zset的value

这个消息的到期处理时间为score

然后多个线程进行轮询zset获取任务进行处理

多线程保障可用性:万一一个线程挂了还有其他线程可用,需要考虑并发争抢任务的问题,确保任务不会被多次执行。通过zrem 删除对应的消息,代表了当前消息被消费了,也就是当前线程是处理该消息的主线程

对于消息的处理需要增加异常捕获,不能因为消息处理导致循环退出

代码方式

Java的话需要选择支持的序列化库,创建一个任务对象,字段有 ,id,msg, id采用UUID,保证消息唯一性,比如要延时5s后执行,则通过zadd加入k1,分数设置为当前时间戳+5s,

消费者循环获取消息时,通过zrangeByScore获取,获取当前k1的 0ms到当前时间戳的消息,这样保证获取的是到了时间的数据,每次只取一条, 开始索引 0, 结束索引 1

这个地方需要勘误:0,1 就是闭区间,得到嘞2条数据,只取1条,需要是0,0,这个需要测试下,

消费者轮询中间sleep 500ms

进一步优化

上面 zrangebyscore 和 zrem是分开执行的,不是原子性的,被其中一个线程zrem的时间哦鸡皮。会导致其他线程浪费一次查询,这样可以利用lua脚本把这两个命令一起执行,多个进程争抢任务的时候就不会出现这种浪费了。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明蚁点博客出处!