如何保证rocketmq的消息不丢失
rocketmq任意时间队列实现原理?
rocketmq任意时间队列实现原理?
RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。
它前身是MetaQ,是阿里基于Kafka的设计使用Java进行自主研发的。在2012年,阿里将其开源, 在2016年,阿里将其捐献给Apache软件基金会(Apache Software Foundation,简称为ASF),正式成为孵化项目。2017 年,Apache软件基金会宣布RocketMQ已孵化成为 Apache顶级项目(Top Level Project,简称为TLP ),是国内首个互联网中间件在 Apache上的顶级项目。
延迟消息
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。
在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:
设置消息延迟级别等于0时,则该消息为非延迟消息。
设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
延迟消息示例
首先,写一个消费者,用于消费延迟消息:
再写一个延迟消息的生产者,用于发送延迟消息:
运行生产者以后,就会发送一条延迟消息:
10秒钟后,消费者收到的这条延迟消息:
延迟消息的原理分析
以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。
CommitLog
在中,针对延迟消息做了一些处理:
可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是。
ScheduleMessageService
ScheduleMessageService是由进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:
遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。
然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。
定时任务
ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:
如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:
如果获取到消息,则继续执行下面操作:
清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。
总结
经过以上对源码的分析,可以总结出延迟消息的实现步骤:
如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
消息进入SCHEDULE_TOPIC_XXXX的队列中。
定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
根据消息的物理偏移量和大小再次获取消息。
根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
重新发送消息到原主题的队列中,供消费者进行消费。
消息队列原理?
消息队列主要解决应用耦合,异步消息,流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。使用较多的消息队列有Kafka、ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ。
消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。