因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 Redisson DelayedQueue,为了搞清楚内部运行流程,特记录下来。
总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程。
图片
发送延迟消息代码如下,发送了一条延迟时间为 5s 的消息。
public void produce() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);}
接收消息代码如下,可以看到 delayedQueue 是没有用到的,那么为什么要加这一行呢,这个后面总结部分回答。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息进行处理...}
这两段代码可以写在两个不同的 Java 工程里,只要连接的是同一个 Redis 就行。
调用 comsume() 之后,如果队列里没有消息,会阻塞等待队列里有消息并且取到了才会返回。之所以这么说是因为可能有别的 Java 进程也在跟你一样取同一个队列里的消息,如果消息被另一个抢完了,那这时就还得阻塞等待。
这时看上去的原理是这样的:
生产者调用 offer() 后,自己内部开启一个定时器,等到了时间再发送到 redis 的 list 里。
图片
如果是这样设计的话,相信大家都能看出来一个很简单的问题,要是延时时间还没到,生产者自己挂了,那样消息就丢了。所以还是让我们接着往下看。
redisson 源码里一共创建了三个队列:【消息延时队列】、【消息顺序队列】、【消息目标队列】。
图片
假设在同一时间按照 msg1、msg2、msg3 的顺序发消息到延时队列,这三条消息就会被保存在【消息延时队列】和【消息顺序队列】。
可以看到【消息延时队列】的顺序是按照到期时间升序排列的,而不是像【消息顺序队列】按照插入顺序排。
消息到期后会将消息从前两个队列移除(怎么移?谁来移?),插入【消息目标队列】,也就是图中第三个队列。
消费者也是阻塞在【消息目标队列】上取消息。
这时可以简单说明下每个队列的作用:
其实【消息延时队列】队列里存的时间(也就是 zet 的 score)是到期的时间戳,为了画图方便,图里就画的是延迟的时间,不过不影响理解。
理解好这几个队列的名字和作用,后面还会一直用到,如果忘了可以翻回来回顾下。
因为书写理解方便和【消息顺序队列】在本文没涉及到,后面部分好几次提到的内容:把到期的消息从【消息延时队列】移到【消息目标队列】里,这句话实际的代码逻辑是这样:把【消息延时队列】和【消息顺序队列】里的到期消息移除,把它们插入到【消息目标队列】。
知道了内部所使用到的数据结构后,这里可以简单说下整体的基本流程。
先说发送延迟消息,发送的延迟消息会先存在【消息延时队列】和【消息顺序队列】,如果【消息延时队列】原本是空的,会发布订阅信息提醒有新的消息。
获取延迟消息只需要从【消息目标队列】阻塞的取就行了,因为里面都是到期数据。
那么问题就只剩下怎么样判断时间到了,把【消息延时队列】里的消息移动到【消息目标队列】里呢?
这部分工作交给了初始化延时队列来处理。
这里面会定时从【消息延时队列】查询最新到期时间,定时去把【消息延时队列】里的消息移动到【消息目标队列】里。
如果【消息延时队列】是空的,就不会再定时查,而是等待发布订阅信息提醒,再定时把【消息延时队列】里的消息移动到【消息目标队列】里。
刚开始看可能有点抽象,可以看完底下一节内容之后,再回头来看这里对应的流程总结,可能会比较清晰。
发送延时消息的逻辑比较简单,先看下发送的代码。
public void produce() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);}
从 delayedQueue.offer 方法开始,最终会执行到 RedissonDelayedQueue 的 offerAsync 方法里。
offerAsync 方法的作用就是发送一段脚本给 redis 执行,脚本内容是:
@Overridepublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));}
获取延时消息是本文最简单的一部分。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息进行处理...}
blockingQueue.take() 方法其实只是对【消息目标队列】执行 blpop 阻塞的获取到期消息
看一下初始化的代码。
public void init() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);}
入口就是在 redissonClient.getDelayedQueue(blockingQueue) 中,创建了 RedissonDelayedQueue 对象,并执行了构造方法里的逻辑。
那么这里面主要做了什么事呢?
主要是调用了 QueueTransferTask 的 start() 方法。
public void start() { RTopic schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } });}
这段代码主要是设置了指定主题(主题名:redisson_delay_queue_channel:{queuename})两个发布订阅的监听器。
需要注意的是,这里会先订阅指定主题,然后触发执行 onSubscribe() 方法。
所以我们主要搞懂这三个方法都是做什么的,那么整个初始化流程就明白了。
因为这三个方法是相互调用的,只看文字的话容易云里雾里,这里有个流程图,看方法解释文字的时候可以对照着流程图看比较有印象。
图片
private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) { return; } if (oldTimeout != null) { oldTimeout.getTask().cancel(); } long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); }}
@Overrideprotected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100);}
看不懂也不要紧,听我解释下就明白了。
这里发送了一段脚本给 redis 执行:
我的理解就是初始化的时候
1是为了处理旧的消息,比如生产者1发送了消息,然后时间没到自己下线了,这时如果没有其他客户端在线,就没有人能把数据从【消息目标队列】移到【消息目标队列】了。
2是返回的这个时间戳,会拿这个定时,等时间到了去【消息目标队列】拉取到期的消息。
简单总结就是这个方法是把到期消息从【消息延时队列】放到【消息目标队列】里,并且返回了最近要到期消息的时间戳。
private void pushTask() { RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.whenComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } });}
这个代码看起来就比较简单,调用了 pushTaskAsync() 获取最近要到期消息的时间戳(异步封装了一下)。
有异常的话就调用 scheduleTask() 五秒后再执行一次 pushTask()。
没有异常的话如果有最近要到期消息的时间戳(说明【消息延时队列】里还有未到期消息),用这个最新到期时间调用 scheduleTask(),在这个指定的时间调用 pushTask()。
这个方法简单总结就是决定了要不要调用、什么时候再调用 pushTask(),主要操作逻辑都在 pushTaskAsync() 里(把到期的消息从【消息延时队列】移到【消息目标队列】供消费端消费)。
了解了上面几个方法的流程和含义,还记得一开头提到的添加了两个发布订阅的监听器吗?
1.当指定主题有新订阅时调用 pushTask() 方法,里面又会调用 pushTaskAsync() 方法
2.当指定主题有新消息时调用 scheduleTask(startTime) 方法
需要注意的是,这里会先订阅指定主题,然后触发执行 onSubscribe() 方法
再放一下开头的图总体流程图:
图片
这里回答开头部分说的问题,到这看完了本文,你可以试着自己想一想这个问题的答案。
接收消息代码如下,可以看到 delayedQueue 是没有用到的,那么为什么要加这一行呢,这个后面总结部分回答。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息进行处理...}
其实这个问题也是我开发过程中遇到的一个奇怪的地方,接收方代码没有初始化延时队列。
首先再啰嗦一句,初始化延时队列的作用是会定时去把【消息延时队列】里的到期数据移动到【消息目标队列】。
如果只有发送方初始化延时队列:
所以接收方代码里也初始化延时队列能够避免一部分数据丢失问题。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-88383-0.html分布式延时消息的另外一种选择 Redisson
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 聊聊Vue.js 基础语法详解
下一篇: 竟然还能这样高效地操作 JSON 对象!