10月02日-三方汇率:7.98(仅供参考)
更多
APP下载
合作
商城
签到

产业资讯 产业资讯 3分钟白话RocketMQ系列—— 如何保证消息不丢失

[复制链接]
  • TA的每日心情
    开心
    2024-4-28 19:19
  • 签到天数: 1 天

    20

    主题

    651

    伯币

    523

    积分

    韩国街菜鸟

    积分
    523
    发表在  2023-10-9 19:36:38 | 显示全部楼层 | 阅读模式
    本帖最后由 会稽山人 于 2023-10-9 19:38 编辑

    我们知道RocketMQ的消息模型分为 生产、存储(消息堆积)、消费 三大部分。
    1.png

    因此,如何保证消息不丢失,也是从这三个环节来考虑。

    关键字摘要
    • 生产、存储(消息堆积)、消费 三个环节保证消息不丢失

    • 生产环节:消息类型,消息确认机制、失败重试机制

    • 存储环节:同步/异步刷盘、同步/异步复制slave

    • 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制


    Q1: 如何保证「消息生产」不丢失?
    先想想什么情况下,消息生产会丢失消息呢?

    生产者将发送消息时,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。

    那怎么解决这个问题?

    其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。

    消息发送成功返回确认消息,那就能确保消息不丢失。如果发送失败了,mq-client就尝试自动重试,避免网络抖动导致发送丢失。

    如果超过一定超时时间还是失败,那就抛出异常,由开发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。

    不过我们需要特别注意是,RocketMQ支持多种「消息类型」,但是并不是对所有「消息类型」 都会有「消息确认机制」和「失败重试机制」。

    RocketMQ生产消息时,支持多种「消息类型」和「消息发送模式」。咱们白话为主,就不展开源码了,有兴趣同学可以参考org.apache.rocketmq.client.producer.MQProducer这个接口定义即可。

    消息类型:

    • 普通消息:发送普通消息,异常时默认重试

    • 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列

    • 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试


    消息发送模式:

    • 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重试次数。

    • 异步:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。配置retryTimesWhenSendAsyncFailed重试次数。

    • 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。


    消息类型 和 消息发送模式 是 N * M 的关系,所以聪明的你一定已经想到了,存在9种不同组合,RocketMQ也是定义了9种不同接口方法。

    这9种方法里面,涉及到「单向发送」模式的3种方法,都是不可靠的,存在丢失消息的风险。其他发送消息的模式和消息类型,可以通过 消息确认、mq-client自动「失败重试机制」、业务自定义重试 等方式,确保消息发送不丢失。

    注意,org.apache.rocketmq.client.producer.MQProducer还定义了「事务消息」的发送模式,是属于分布式事务范畴了,跟我们这里讨论的消息不丢失不太一样,就不展开讨论了。后面单独写一篇针对「事务消息」的分析。

    Q2: 如何保证「消息存储」不丢失?

    先想想什么情况下,消息存储会丢失呢?

    场景1,消息保存到内存中,还没来得及刷盘到磁盘,机器宕机或者重启,导致内存中消息丢失。

    场景2,为了提高可用性,Broker通常采用一主多从的部署方式,为了确保消息不丢失,消息需要被复制到从节点。当消息发送到master但是还没同步到slave broker时,master broker磁盘损坏,导致消息数据丢失。或者master宕机,consumer切换到slave消费数据,消息丢失。

    针对场景1,默认情况下,消息在到达 Broker 端后会首先被保存在内存中,并立即向生产者返回确认响应。随后,Broker 会定期批量将一组消息异步刷入磁盘。这种方式减少了 I/O 操作次数,提高了性能。

    然而,如果发生机器掉电、异常宕机等情况,未及时将消息刷入磁盘,就可能导致消息丢失的情况。

    如果要确保 Broker 端不丢失消息并保证消息的可靠性,我们需要修改消息保存机制为同步刷盘方式,即只有当消息成功存储到磁盘后才返回响应。可以通过flushDiskType = SYNC_FLUSH 参数进行控制。

    针对场景2,在默认方式下,当消息成功写入主节点时,就会返回确认响应给生产者,并异步将消息复制到从节点。然而,如果主节点突然宕机且无法恢复,尚未复制到从节点的消息将会丢失。

    为了进一步提高消息的可靠性,我们可以采用同步复制方式。主节点将会同步等待从节点完成复制,然后才返回确认响应。这样可以确保消息的可靠性。

    可以通过brokerRole=SYNC_MASTER参数进行控制。

    注意,同步刷盘 和 同步复制 虽然能够保证消息不丢失,但是会严重降低性能,生产实践中需要根据实际情况综合评估。

    Q3: 如何保证「消息消费」不丢失?

    先想想什么情况下,消息存储会丢失呢?

    因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。

    那怎么解决这个问题?

    跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」。

    消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。

    如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。

    重试消息会被存入名为 "%RETRY%+消费组名称" 的Topic中,原始主题Topic会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

    另外,RocketMQ跟kafka不同的是,天然支持了 「死信队列机制」。

    如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

    关键字总结
    • 生产、存储(消息堆积)、消费 三个环节保证不丢失

    • 生产环节:消息类型,消息确认机制、失败重试机制

    • 存储环节:同步/异步刷盘、同步/异步复制slave

    • 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制



  • TA的每日心情
    开心
    2024-4-28 19:19
  • 签到天数: 1 天

    20

    主题

    651

    伯币

    523

    积分

    韩国街菜鸟

    积分
    523
     楼主| 发表于 2023-10-9 19:39:10 | 显示全部楼层
    总之来说四大要素要清楚

    发表回复

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    快速回复 返回顶部 返回列表