使用默认的交换机exchange或routingKey。
图片
调用方法:
图片
使用默认的交换机,routingKey必须为quenue队列的名称。
调用方法:
图片
案例:
/** * @Author yangyalin * @Description 测试发送消息(直接使用队列发送,使用默认的交换机) routingKey:即为对列的名称即可 **/public void testSendMsg(String message){ rabbitTemplate.convertAndSend(RabbitMQConvertConfig.TEST_QUEUE,message);}
使用指定的交换机,若绑定routingKey,必须使用指定的模式;若没有绑定,可设置为""。
图片
public void sendDecreStockMessage(DecreStockFromRabbit decreStockFromRabbit){ CorrelationData correlationData = new CorrelationData(); correlationData.setId(decreStockFromRabbit.getMessageId()); /** * exchange:交换机 routingKey:路由键 message:消息体内容 correlationData:消息唯一ID **/ rabbitTemplate.convertAndSend(RabbitMQConvertConfig.ORDER_EXCHANGE, RabbitMQConvertConfig.ORDER_ROUTINGKEY, decreStockFromRabbit,correlationData);}或:rabbitTemplate.convertAndSend("test-exchange","",message);
/** * 功能描述:当消费同一个队列的时候,可通过设置实现能则多劳, * 消息轮询方式订阅 * @MethodName: process11 * @MethodParam: [testMessage] * @Return: void * @Author: yyalin * @CreateDate: 2022/4/9 17:10 */ @RabbitListener(queues = "TestDirectQueue") //监听的队列名称 TestDirectQueue public void process11(Map testMessage) throws InterruptedException { log.info("消费者收到消息222:" + testMessage.toString()); Thread.sleep(200); }
@RabbitListener(queuesToDeclare=@Queue(TopicExchangeConfig.TEST_QUEUE2))@RabbitHandlerpublic void receiveTestMsg2(@Payload String str) throws Exception{ log.info("开始接收消息。。。。。"); log.info("接收到的消息:"+str);}
自动创建且交换机和队列绑定,key可指定也可不指定(默认为队列名称)。
/******************方案二:使用注解的方式绑定队列在交换机上*******************/ @RabbitListener(bindings = @QueueBinding(value=@Queue(name="directQueue"), exchange=@Exchange(name="directExchange",type = ExchangeTypes.DIRECT), key={"red", "blue"} )) //监听的队列名称 TestDirectQueue public void directConsumer(String message) { log.info("消费者收到direct消息555 : " + message); } @RabbitListener(bindings = @QueueBinding(value=@Queue(name="topicQueue2"), exchange=@Exchange(name="topicExchange",type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"), key="#.new" )) public void topicConsumer2(String message) { log.info("消费者收到topic消息888 : " + message); }
备注:ignoreDeclarationExceptions = "true" : 即使配置出现了错误也不至于整个应用程序都启动失败的情况。
1、channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel。
同时,我们采用手工ACK的方式,因为我们配置文件配置了 spring.rabbitmq.listener.simple.acknowledge-mode=manual:
2、channel.basicAck(deliveryTag, false):deliveryTag表示处理的消息条数(一般为1),从heaers中取,false表示不批量ack。
/** * 功能描述: 消费端加上手动确认消息被接收 * @MethodName: process * @MethodParam: [message] * @Return: void * @Author: yyalin * @CreateDate: 2022/4/18 19:10 */ @RabbitListener(queues = "TestDirectQueue3") //监听的队列名称 TestDirectQueue public void process(String message, Channel channel) throws IOException { log.info("DirectReceiver消费者收到消息1 : " + message); long msgId=1111L; //消息ID try { //手动确认消息已消费 channel.basicAck(msgId,false); } catch (IOException e) { //把消息失败的消息重新放入到队列 channel.basicNack(msgId,false,true); e.printStackTrace(); } }
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-17666-0.htmlRabbitMQ发送和接收消息的几种方式
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com