当前位置:首页 > 科技  > 软件

RabbitMQ如何保证消息可靠性?

来源: 责编: 时间:2024-05-09 09:26:05 84观看
导读本篇文章不再介绍RabbitMQ具体实现原理,直接介绍如何保证消息的可靠性问题。所谓可靠性,指消息不重不漏。文章导读图片生产者消费者模型  生产者-消费者模型用于描述两类进程(生产者和消费者)之间的数据交互。可以被认

本篇文章不再介绍RabbitMQ具体实现原理,直接介绍如何保证消息的可靠性问题。所谓可靠性,指消息不重不漏。u6B28资讯网——每日最新资讯28at.com

文章导读

图片图片u6B28资讯网——每日最新资讯28at.com

生产者消费者模型

  生产者-消费者模型用于描述两类进程(生产者和消费者)之间的数据交互。可以被认为是独立的服务,生产者负责生成数据,消费者负责处理这些数据。在分布式系统中,队列在其中扮演了消息(数据)传递的功能。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

关于消息队列的作用,一般解读为:u6B28资讯网——每日最新资讯28at.com

解耦:生产者和消费者独立运作,无需知道对方的运行状态。u6B28资讯网——每日最新资讯28at.com

异步:并非实时,生产者不必关注消费端的消费情况。u6B28资讯网——每日最新资讯28at.com

削峰:限制流量,防止消费者过载。u6B28资讯网——每日最新资讯28at.com

消息丢失

  这其实不难理解,就像生活中下单-快递-签收的过程。这个过程和上边的生产者-消费者模型恰有异曲同工之妙。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

这个过程中,u6B28资讯网——每日最新资讯28at.com

  • 下单用户(生产者)
  • 快递小哥(队列)
  • 签收人(消费者)
  • 快件(消息)

如果包裹被粗略的认为是一条消息,那么快件在邮寄过程中丢失了,就是消息丢失。快件从发货到签收,我们不用去关心中间发生了什么。但是要是没收到货,那得给我个理由。u6B28资讯网——每日最新资讯28at.com

如何排查?

  就上边的快件丢失问题,怎么知道快递为何没有收到?很简单,一段一段的排查:u6B28资讯网——每日最新资讯28at.com

  1. 商家是否有发货?
  2. 快递公司是否揽收?
  3. 查看快递小哥是否放入代收点

相应的,如果生产环境中突然发现诸如:告警、服务宕机、数据流转异常等问题时,我们也会在链路上(A、B、C三处)逐一排查。u6B28资讯网——每日最新资讯28at.com

产生原因及解决方案

1、生产端可靠性投递

为确保消息从生产端可靠地投递到RabbitMQ,我们需要考虑以下几个关键点:u6B28资讯网——每日最新资讯28at.com

网络故障:消息可能在传输过程中因网络问题而丢失。u6B28资讯网——每日最新资讯28at.com

RabbitMQ故障:如果RabbitMQ宕机,消息也可能丢失。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

对应解决方案:u6B28资讯网——每日最新资讯28at.com

  • 开启事务机制

事务在RabbitMQ中可能会影响性能,因为它们需要在所有节点上同步状态。因此,RabbitMQ尽量避免使用事务。核心代码:u6B28资讯网——每日最新资讯28at.com

private static void executeTransaction(Channel channel) throws IOException {        boolean transactionSuccess = false;        try {            // 开启事务            channel.txSelect();             // 执行一系列消息操作,例如:channel.basicPublish(exchange, routingKey, message);            // 提交事务            channel.txCommit();             transactionSuccess = true;        } catch (ShutdownSignalException | IOException e) {            // 回滚事务            if (!transactionSuccess) {                channel.txRollback();             }            throw e;        }    }
  • 生产者确认机制

发布者确认机制允许发布者知道消息是否已经被RabbitMQ成功接收:u6B28资讯网——每日最新资讯28at.com

public static void sendPersistentMessage(String host, String queueName, String message) {        try (Connection connection = new ConnectionFactory().setHost(host).newConnection();             Channel channel = connection.createChannel()) {            // 启用发布者确认            channel.confirmSelect();            // 将消息设置为持久化            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()                    .deliveryMode(2)                     .build();                                // 添加确认监听器            channel.addConfirmListener(new ConfirmListener() {                @Override                public void handleAck(long deliveryTag, boolean multiple) throws IOException {                    System.out.println("消息已确认: " + deliveryTag);                    // 消息正确到达Broker时的处理逻辑                }                @Override                public void handleNack(long deliveryTag, boolean multiple) throws IOException {                    System.out.println("消息未确认: " + deliveryTag);                    // 因为内部错误导致消息丢失时的处理逻辑                }            });            channel.basicPublish("", queueName, properties, message.getBytes());            // 等待消息确认,或者超时            boolean allConfirmed = channel.waitForConfirms();                        if (allConfirmed) {                //所有消息都已确认            } else {                //超时或其它            }                   } catch (IOException | TimeoutException | InterruptedException e) {            e.printStackTrace();        }}

2、消息持久化

在RabbitMQ中,消息的持久化它确保消息不仅存储在内存中,而且也安全地保存在磁盘上。这样,即使在RabbitMQ服务崩溃或重启的情况下,消息也不会丢失,可以从磁盘恢复。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

消息到达RabbitMQ后通过Exchange交换机,路由给queue队列,最后发送给消费端。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

从RabbitMQ设计上看,消息的持久化应该从以下方面入手:u6B28资讯网——每日最新资讯28at.com

  • Exchange持久化:
// 设置 durable = true; channel.exchangeDeclare(exchangeName, "direct", durable);
  • 消息持久化:
// 设置 MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • Queue持久化:
//设置 boolean durable = true;channel.queueDeclare(queueName, durable, exclusive, false, null);

这样,如果RabbitMQ收到消息后挂了,重启后会自行从磁盘上恢复消息。u6B28资讯网——每日最新资讯28at.com

3、消费者确认机制

如果上述生产端、消息队列都正确投递,那么问题出现在消费端是否可以正确消费?u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。u6B28资讯网——每日最新资讯28at.com

默认情况下,以下3种原因导致消息丢失:u6B28资讯网——每日最新资讯28at.com

1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;u6B28资讯网——每日最新资讯28at.com

2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;u6B28资讯网——每日最新资讯28at.com

3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。u6B28资讯网——每日最新资讯28at.com

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

应对方案:u6B28资讯网——每日最新资讯28at.com

  • 将自动ack机制改为手动ack机制。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {    try {        //接收消息,业务处理        //设置手动确认        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    } catch (Exception e) {        //发生异常时,可以选择重新发送消息或进行错误处理        // 例如,可以选择负确认(nack),让消息重回队列        // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);    }};//设置autoAck为false,表示关闭自动确认机制,改为手动确认channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

4、消息补偿机制

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。u6B28资讯网——每日最新资讯28at.com

如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:u6B28资讯网——每日最新资讯28at.com

图片图片u6B28资讯网——每日最新资讯28at.com

  • 生产端发送消息;
  • 确认失败,将消息保存到数据库中,并设置初始状态0;
  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
  • 重发消息,可多次;
  • 重发成功,更新数据库:status=1;
  • 超过固定次数重发仍然失败,人工干预。

标注:u6B28资讯网——每日最新资讯28at.com

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列。u6B28资讯网——每日最新资讯28at.com

  • 可人工干预手动排查
  • 也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。

以上,我们知道了消息丢失问题如何处理?那么对于消息重复的问题,下面做个介绍。u6B28资讯网——每日最新资讯28at.com

消息重复消费

消息重复消费是指在消息队列中,同一条消息被不同的消费者多次消费处理。u6B28资讯网——每日最新资讯28at.com

产生原因:u6B28资讯网——每日最新资讯28at.com

  • 网络问题:消费者处理完消息后,因网络问题导致确认信息未能成功发送回消息队列。
  • 服务中断:消费者在确认消息之前服务崩溃,消息队列未收到确认信号。
  • 确认机制:自动确认模式下,如果确认在消息处理完成前发生,消息可能会被重复消费

对应解决方案:u6B28资讯网——每日最新资讯28at.com

1. 幂等性设计

设计消费者的消息处理逻辑时,要保证即使消息被多次消费,也不会对系统状态产生不良影响。幂等性可以通过以下方式实现:u6B28资讯网——每日最新资讯28at.com

  • 数据库唯一约束:使用数据库的主键约束或唯一索引防止插入重复记录。
  • 业务逻辑检查:在执行业务操作前,先检查是否已经处理过该消息。

2. 消息去重策略

使用唯一标识符(如订单号、massageID)来识别消息,并在消费者中实现去重逻辑:u6B28资讯网——每日最新资讯28at.com

  • 缓存检查:使用内存缓存(如Redis)存储已处理的消息ID。
  • 持久化存储:将消息ID与处理状态保存在数据库中,以便跨服务重启后仍然有效。

3. 手动确认与重试机制

通过手动确认消息,控制消息何时从队列中移除:u6B28资讯网——每日最新资讯28at.com

  • 手动确认:在消息成功处理后,显式调用channel.basicAck()方法确认消息。
  • 重试机制:如果消息处理失败,可以选择将消息重新入队(channel.basicReject(requeue=true))或丢弃(channel.basicReject(requeue=false))。

代码演示:u6B28资讯网——每日最新资讯28at.com

消费者端去重逻辑u6B28资讯网——每日最新资讯28at.com

@RabbitListener(queues = "queueName", acknowledgeMode = "MANUAL")public void receiveMessage(Message message, Channel channel) throws IOException {    String messageId = message.getMessageProperties().getMessageId();        // 检查消息是否已消费    if (messageAlreadyProcessed(messageId)) {        // 消息已消费,确认消息并返回        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        return;    }        // 处理消息    try {        processMessage(message);        // 消息处理成功,持久化消息ID并确认消息        persistMessageId(messageId);        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    } catch (Exception e) {        // 处理失败,可以选择重新入队或丢弃        boolean requeue = shouldRequeue(message);        channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);    }}

生产者端发布确认u6B28资讯网——每日最新资讯28at.com

void sendWithConfirm(AmqpTemplate amqpTemplate, Message message) throws IOException {    ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {        if (!ack) {            // 处理消息发送失败的逻辑            // ...        }    };    amqpTemplate.setConfirmCallback(confirmCallback);    amqpTemplate.convertAndSend("exchangeName", "routingKey", message);}

具体实现需要根据实际业务逻辑和RabbitMQ配置进行调整。u6B28资讯网——每日最新资讯28at.com

总结

以上介绍了RabbitMQ保证消息可靠性的问题、产生原因、解决方案等。不足之处,欢迎指正。u6B28资讯网——每日最新资讯28at.com

本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-87491-0.htmlRabbitMQ如何保证消息可靠性?

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: SpringBoot一个非常强大的数据绑定类

下一篇: 纯 CSS 实现标签自动显示超出数量

标签:
  • 热门焦点
Top
Baidu
map