在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,这个专题都将带你穿越从零到全面掌握Spring Boot的学习曲线。
Day 32 ~ Springboot3.1.x|3分钟学会在 RabbitMQ 中实现发布订阅模式
发布-订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特定的接收者(订阅者)。发布者类别定义了哪些订阅者因为订阅者匹配了发布者的类别而接收消息。
以下是使用RabbitMQ实现发布-订阅模式的一种例子,我们将使用RabbitMQ的Fanout Exchange。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "Log message..."; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Sent '" + message + "'"); } }}
在上述代码的channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),我们声明一个名为"log"的exchange,同时我们定义其类型为"fanout",意味着它会将接收到的所有消息广播给所有它所知道的队列。
每一个订阅者都需要拥有一个queue,因此,我们需要在客户端中创建queue。
import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}
在这个例子中,我们声明一个新的queue,并将其与"logs"的exchange绑定。然后我们定义了消息的接收以及处理方式。
在使用消息中间件的过程中,消息发送失败是无法避免的情况。因此,我们需要对此进行正确的处理以避免因此而导致的系统问题。
对于消息发送失败的处理,有以下几种常用的方案:
RabbitMQ中的消息确认(publisher confirms)和消费者应答(Consumer Acknowledgements)就是为了解决此类问题。
ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); String queueName = "test"; String message = "Hello world"; try { channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); channel.basicPublish("", queueName, null, message.getBytes()); if (!channel.waitForConfirms()) { System.out.println("消息发送失败"); } } catch (Exception e) { System.out.println("错误: " + e.getMessage()); }}
上述代码中执行channel.confirmSelect();后,当前channel被设置为publisher confirm模式。在此模式下,当消息被RabbitMQ成功接收后,会发送一个确认给生产者。如果RabbitMQ没有发送确认,那么生产者可以认定该消息发送失败。
结论:掌握发布-订阅模式和消息发送失败处理策略,对于掌握消息队列的使用至关重要,可为系统的稳定性和扩展性提供保障。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-79990-0.html三分钟学会在 RabbitMQ 中实现发布订阅模式
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com