秦皇岛网站开发价格,企业网络拓扑图的设计方案,信息流推广渠道有哪些,asp.net建立手机网站1.消息可靠性传递
在使用RabbitMQ的时候#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式
1.confirm 确认模式 确认模式是由exchange决定的
2.return 退回模式 回退模式是由routing…1.消息可靠性传递
在使用RabbitMQ的时候作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式
1.confirm 确认模式 确认模式是由exchange决定的
2.return 退回模式 回退模式是由routingKey决定的
rabbitmq整个消息投递的路径为
producer - rabbitmq broker - exchange - queue -consumer
消息从producer到exchange则会返回一个confirmCallback
消息从exchange-queue投递失败则会返回一个returnCallback
我们将利用这两个callback控制消息的可靠性投递
创建rabbitmq-producer-spring项目 并配置
配置文件开启模式
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:rabbithttp://www.springframework.org/schema/rabbitxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd!--加载配置文件--context:property-placeholder locationclasspath:rabbitmq.properties/!-- 定义rabbitmq connectionFactory确认模式开启:publisher-confirmstrue回退模式开启: publisher-returnstrue--rabbit:connection-factory idconnectionFactory host${rabbitmq.host}port${rabbitmq.port}username${rabbitmq.username}password${rabbitmq.password}virtual-host${rabbitmq.virtual-host}publisher-confirmstrue publisher-returnstrue/!--定义管理交换机、队列--rabbit:admin connection-factoryconnectionFactory/!--定义rabbitTemplate对象操作可以在代码中方便发送消息--rabbit:template idrabbitTemplate connection-factoryconnectionFactory/!--消息可靠性投递生产端--rabbit:queue idtest_queue_confirm nametest_queue_confirm/rabbit:queuerabbit:direct-exchange nametest_exchange_confirmrabbit:bindingsrabbit:binding queuetest_queue_confirm keyconfirm/rabbit:binding/rabbit:bindings/rabbit:direct-exchange/beans
确认模式 (publisher-confirmstrue)
RunWith(SpringRunner.class)
ContextConfiguration(locations classpath:/spring-rabbitmq-producer.xml)
public class ProducerTest {//Autowired 在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列Autowiredprivate RabbitTemplate rabbitTemplate;//确认模式收到消息与否都会被执行Testpublic void testConfirm() {//1. 开启去确认回调函数 publisher-confirmstrue//2. 注册回调函数当交换机收到消息confirm方法会被调用rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(correlationData correlationData); //关联数据在发送消息时指定的//确认机制:ack true表示收到了 false表示没收到if (ack){System.out.println(接收成功消息 ack);}else {System.out.println(接收失败消息 cause);//做一些处理,让消息再次发送。}}});//3. 发送消息rabbitTemplate.convertAndSend(test_exchange_confirm, confirm, confirm info...);//成功//测试不正常情况//rabbitTemplate.convertAndSend(test_exchange_confirm000, confirm, message confirm....);//失败}
}
回退模式 (publisher-returnstrue)
RunWith(SpringRunner.class)
ContextConfiguration(locations classpath:/spring-rabbitmq-producer.xml)
public class ProducerTest {//Autowired 在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列Autowiredprivate RabbitTemplate rabbitTemplate;//回退模式Testpublic void testReturn() {//1.开启回退模式 publisher-returnstrue//2.注册回退函数(ReturnCallBack)当没有任何队列与交换机绑定或即使绑定也没有任何匹配时将执行回退方法// 成功接收不执行return方法//3. 必须设置强制回退 setMandatory(true) 设置交换机处理失败消息的模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {System.out.println(消息对象message message);System.out.println(错误码replyCode replyCode);System.out.println(错误信息replyText replyText);System.out.println(交换机exchange exchange);System.out.println(路由键routingKey routingKey);//处理}});//3. 发送消息//rabbitTemplate.convertAndSend(test_exchange_confirm, confirm, confirm info...);//成功//测试不正常情况rabbitTemplate.convertAndSend(test_exchange_confirm, confirm123, confirm info...);//成功}}
小结
设置 ConnectionFactory的publisher-confirmstrue 开启 确认模式。
使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack如果为true则发送成功如果为false则发送失败需要处理。
设置 ConnectionFactory 的 publisher-returnstrue 开启 退回模式。
使用 rabbitTemplate.setReturnCallback 设置退回函数当消息从exchange 路由到 queue 失败后如果设置了 rabbitTemplate.setMandatory(true) 参数则会将消息退回给 producer并执行回调函数returnedMessage
2.Consumer Ack (消费端确认方式)
ack指Acknowledge确认。 表示消费端收到消息后的确认方式
有三种确认方式
自动确认acknowledge“none” 默认
手动确认acknowledge“manual”
根据异常情况确认acknowledge“auto”这种方式使用麻烦
其中自动确认是指当消息一旦被Consumer接收到则自动确认收到并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中很可能消息接收到业务处理出现异常那么该消息就会丢失。
如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()手动签收如果出现异常则调用channel.basicNack()方法让其自动重新发送消息
创建rabbitmq-consumer-spring项目并配置
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:rabbithttp://www.springframework.org/schema/rabbitxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd!--加载配置文件--context:property-placeholder locationclasspath:rabbitmq.properties/!-- 定义rabbitmq connectionFactory --rabbit:connection-factory idconnectionFactory host${rabbitmq.host}port${rabbitmq.port}username${rabbitmq.username}password${rabbitmq.password}virtual-host${rabbitmq.virtual-host}/context:component-scan base-packagecom.listener /!--定义监听器容器默认为自动确认acknowledgemanual手动签收--rabbit:listener-container connection-factoryconnectionFactory acknowledgemanual!--rabbit:listener refqosListener queue-namestest_queue_confirm/rabbit:listener--rabbit:listener refackListener queue-namestest_queue_confirm/rabbit:listener/rabbit:listener-container
/beanspackage com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;RunWith(SpringJUnit4ClassRunner.class)
ContextConfiguration(locations classpath:spring-rabbitmq-consumer.xml)
public class ConsumerTest {Testpublic void test(){while (true){}}
}1.自动确认 (默认)
//自动确认
Component
//implements MessageListener 自动确认
//ChannelAwareMessageListener 手动确认
public class AckListener implements MessageListener {//自动确认Overridepublic void onMessage(Message message) {try {System.out.println(new String(message.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
2.手动确认 (acknowledgemanual手动签收)
package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import javax.sound.midi.Soundbank;
import java.io.UnsupportedEncodingException;/*
* 1.设置手动签收acknowledge”manual“
* 2.让监听器类实现ChannelAwareMessageListener接口
* 3.如果消息成功处理则调用channel的basicAck()签收
* 4.如果消息处理失败则调用channel的basicNack()拒绝签收broker重新发送给consumer
* */
Component
//implements MessageListener 自动确认
//ChannelAwareMessageListener 手动确认
public class AckListener implements ChannelAwareMessageListener {//手动确认Overridepublic void onMessage(Message message, Channel channel) throws Exception {//消息唯一标识long deliveryTag message.getMessageProperties().getDeliveryTag();try {//1.接收转换消息System.out.println(new String(message.getBody(),UTF-8));//消息得到后需要进行业务逻辑处理如存储数据库发送邮件、短信、保存到Redis等等//2.处理业务逻辑System.out.println(处理业务逻辑...);int i 3/0;//模拟业务异常//3.手动签收//业务逻辑成功才会确认否则不予确认channel.basicAck(deliveryTag,true);//消息唯一标识和是否确认多个} catch (Exception e) {e.printStackTrace();//4.拒绝签收//NoAck 不予确认/** 三个参数* deliveryTag 消息标签唯一标识类似数据库主键* multiple true 拒绝接收的所有消息 false拒绝刚刚的消息* requeue true 表示是否重回原有队列false表示丢弃或成为死信 (被死信队列接收)*/channel.basicNack(deliveryTag,true,false);}}
}3.消费端限流 (prefetch2限流为2) 就是在消费这些消息的时候做了限流
修改配置 context:component-scan base-packagecom.listener /!--定义监听器容器默认为自动确认acknowledgemanual手动签收--rabbit:listener-container connection-factoryconnectionFactory acknowledgemanual prefetch2!--rabbit:listener refackListener queue-namestest_queue_confirm/rabbit:listener--rabbit:listener refqosListener queue-namestest_queue_confirm/rabbit:listener/rabbit:listener-container
新建 com.listener.QosListener
package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** Consumer 限流机制* 1. 确保消息被确认。不确认是不继续处理其他消息的* 2. listener-container配置属性* prefetch1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息。*/
Component
public class QosListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {//1.获取消息System.out.println(new String(message.getBody()));//2.确认消息channel.basicAck(deliveryTag,true);}catch (Exception e){e.printStackTrace();channel.basicNack(deliveryTag,true,false);}}
}小结
在rabbit:listener-container中配置prefetch属性设置消费端一次拉去多少消息
消费端的确认模式一定为手动确认。acknowledge“manual”
4.TTL (消息存活时间)
TTL 全称 Time To Live存活时间/过期时间
当消息到达存活时间后还没有被消费会被自动清除
RabbitMQ可以对消息设置过期时间也可以对整个队列Queue设置过期时间 控制后台演示消息过期
修改管理后台界面增加队列
参数表示过期时间单位毫秒 10000表示10秒 增加交换机 绑定队列 发送消息
Delivery mode2-Persistent表示需要进行持久化 查看消息可以看到消息但十秒之后消息自动消失因为我们设置了十秒消息过期 代码实现
修改配置文件 spring-rabbitmq-producer.xml !--TTL 队列--rabbit:queue nametest_queue_ttl idtest_queue_ttl!--设置queue的参数--rabbit:queue-arguments!--设置x-message-ttl队列的过期时间默认情况下value-type的类型是String类型,但时间的类型是number类型,所以需要设置成integer类型--entry keyx-message-ttl value10000 value-typejava.lang.Integer/entry!--指定类型?因为队列已经存在配置队列需要和已存在队列属性相同--entry keyx-queue-type valueclassic value-typejava.lang.String/entry/rabbit:queue-arguments/rabbit:queue!--设置交换机--rabbit:topic-exchange nametest_exchange_ttl!--交换机绑定队列--rabbit:bindingsrabbit:binding patternttl.# queuetest_queue_ttl/rabbit:binding/rabbit:bindings/rabbit:topic-exchange
在测试类 ProducerTest 中,添加测试方法,发送消息 /*** TTL过期时间* 1. 队列统一过期* 2. 消息单独过期* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。*/Testpublic void testTTL() {//队列统一过期for (int i 0; i 10; i) {rabbitTemplate.convertAndSend(test_exchange_ttl,ttl.hehe,message ttl);}}Testpublic void testMessageTtl() {// 消息后处理对象,设置一些消息的参数信息MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置message的信息// 第二个方法消息的过期时间 ,5秒之后过期message.getMessageProperties().setExpiration(5000);//2.返回该消息return message;}};
小结
设置消息队列过期时间使用参数x-message-ttl,单位ms毫秒会对整个队列消息统一过期
设置消息过期时间使用参数expiration。单位ms毫秒当该消息在队列头部时消费时会单独判断这一消息是否过期
如果两者都进行了设置以时间段的为准
5.DLX死信队列
死信队列英文缩写DLX 。DeadLetter Exchange死信交换机无法被消费的消息当消息成为Dead message后可以被重新发送到另一个交换机这个交换机就是DLX 面试题: 消息成为死信的三种情况
队列消息数量到达限制比如队列最大只能存储10条消息而发了11条消息根据先进先出最先发的消息会进入死信队列
消费者拒接消费消息basicNack/basicReject并且不把消息重新放入原目标队列requeuefalse
消息超时原队列存在消息过期设置消息到达超时时间未被消费
队列绑定死信交换机
给队列设置参数x-dead-letter-exchange和x-dead-letter-routing-key 修改配置文件 spring-rabbitmq-producer.xml !--死信队列--!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--rabbit:queue nametest_queue_dlx idtest_queue_dlx!--3. 正常队列绑定死信交换机 设置条件--rabbit:queue-arguments!--3.1 x-dead-letter-exchange死信交换机名称--entry keyx-dead-letter-exchange valueexchange_dlx/!--3.2 x-dead-letter-routing-key发送给死信交换机的routingkey--entry keyx-dead-letter-routing-key valuedlx.hehe/entry!--4.1 设置队列的过期时间 ttl--entry keyx-message-ttl value10000 value-typejava.lang.Integer/!--4.2 设置队列的长度限制 max-length --entry keyx-max-length value10 value-typejava.lang.Integer//rabbit:queue-arguments/rabbit:queue!--正常交换机--rabbit:topic-exchange nametest_exchange_dlxrabbit:bindingsrabbit:binding patterntest.dlx.# queuetest_queue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)--rabbit:queue namequeue_dlx idqueue_dlx/rabbit:queuerabbit:topic-exchange nameexchange_dlxrabbit:bindingsrabbit:binding patterndlx.# queuequeue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange
在测试类 ProducerTest 中,添加测试方法
三种成为死信情况 /*发送测试死信消息:1、过期时间2、长度限制3、消息拒收*/Testpublic void testDlx(){//1. 测试过期时间,死信消息//rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗);//2. 测试长度限制后,消息死信/*for (int i 0; i 11; i) {rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗i i);}*///3.发送消息:拒收情况rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗);}
消费端需创建DlxListener并在配置添加对应监听容器
小结
1.死信交换机和死信队列和普通的没有区别
2.当消息成为死信后如果该队列绑定了死信交换机则消息会被死信交换机重新路由到死信队列
6.延迟队列
延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费
延迟队列存储的对象肯定是对应的延时消息所谓”延时消息”是指当消息被发送以后并不想让消费者立即拿到消息而是等待指定时间后消费者才拿到这个消息进行消费
需求场景
1. 下单后30分钟未支付取消订单回滚库存
2. 新用户注册成功30分钟后发送短信问候
实现方式
1.定时器
2.延迟队列 但是RabbitMQ没有提供延迟队列功能
但是可以使用TTL死信队列 组合实现延迟队列的效果 代码实现
修改配置文件 spring-rabbitmq-producer.xml
!--延迟队列1、定义正常交换机order_exchange和队列order_queue2、定义死信交换机order_exchange_dlx和队列order_queue_dlx3、绑定设置正常队列过期时间为30分钟
--!--1、定义正常交换机order_exchange和队列order_queue--rabbit:queue nameorder_queue idorder_queue!--3、绑定设置正常队列过期时间为30分钟--rabbit:queue-argumentsentry keyx-dead-letter-exchange valueorder_exchange_dlx/entryentry keyx-dead-letter-routing-key valuedlx.order.cancel/entryentry keyx-message-ttl value10000 value-typejava.lang.Integer//rabbit:queue-arguments/rabbit:queuerabbit:topic-exchange nameorder_exchangerabbit:bindingsrabbit:binding patternorder.# queueorder_queue/rabbit:binding/rabbit:bindings/rabbit:topic-exchange!--2、定义死信交换机order_exchange_dlx和队列order_queue_dlx--rabbit:queue nameorder_queue_dlx idorder_queue_dlx/rabbit:queuerabbit:topic-exchange nameorder_exchange_dlxrabbit:bindingsrabbit:binding patterndlx.order.# queueorder_queue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange
生产端ProducerTest
package com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
ContextConfiguration(locations classpath:/spring-rabbitmq-producer.xml)
public class ProducerTest {//Autowired 在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列Autowiredprivate RabbitTemplate rabbitTemplate;//确认模式收到消息与否都会被执行Testpublic void testConfirm() {//1. 开启去确认回调函数 publisher-confirmstrue//2. 注册回调函数当交换机收到消息confirm方法会被调用rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(correlationData correlationData); //关联数据在发送消息时指定的//确认机制:ack true表示收到了 false表示没收到if (ack){System.out.println(接收成功消息 ack);}else {System.out.println(接收失败消息 cause);//做一些处理,让消息再次发送。}}});//3. 发送消息rabbitTemplate.convertAndSend(test_exchange_confirm, confirm, confirm info...);//成功//测试不正常情况//rabbitTemplate.convertAndSend(test_exchange_confirm000, confirm, message confirm....);//失败}Testpublic void testSend(){for (int i 0; i 10; i) {//发送消息rabbitTemplate.convertAndSend(test_exchange_confirm,confirm,confirm info...);}}/*** TTL过期时间* 1. 队列统一过期* 2. 消息单独过期* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。*/Testpublic void testTTL() {//队列统一过期for (int i 0; i 10; i) {rabbitTemplate.convertAndSend(test_exchange_ttl,ttl.hehe,message ttl);}}Testpublic void testMessageTtl() {// 消息后处理对象,设置一些消息的参数信息MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置message的信息// 第二个方法消息的过期时间 ,5秒之后过期message.getMessageProperties().setExpiration(5000);//2.返回该消息return message;}};}/*发送测试死信消息:1、过期时间2、长度限制3、消息拒收*/Testpublic void testDlx(){//1. 测试过期时间,死信消息//rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗);//2. 测试长度限制后,消息死信/*for (int i 0; i 11; i) {rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗i i);}*///3.发送消息:拒收情况rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.haha,我是一条消息,我会死吗);}Testpublic void testDelay() throws InterruptedException {//1、发送订单消息。将来是在订单系统中下单成功后发送消息rabbitTemplate.convertAndSend(order_exchange,order.msg,订单信息id1,time333333);//2、打印倒计时for (int i 10; i 0; i--) {System.out.println(i...);Thread.sleep(1000);}}
}消费端OrderListener
package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;Component
public class OrderListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(1000);long deliveryTag message.getMessageProperties().getDeliveryTag();try{//1.接收转换消息System.out.println(new String(message.getBody()));//2.处理业务逻辑System.out.println(处理业务逻辑...);System.out.println(根据订单id查询其状态...);System.out.println(判断状态是否为支付成功);System.out.println(取消订单回滚库存);//3.手动签收channel.basicAck(deliveryTag,true);}catch (Exception e){//4.拒绝签收/** 第三个参数requeue重回队列。如果设置为true则消息重新回到queuebroker会重新发送该消息给消费端** */System.out.println(出现异常拒绝接收);//拒绝接收不重回队列requeuefalsechannel.basicNack(deliveryTag,true,false);//channel.basicReject(deliveryTag,true);只允许单挑确认}}
}修改配置spring-rabbitmq-consumer.xml
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:rabbithttp://www.springframework.org/schema/rabbitxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd!--加载配置文件--context:property-placeholder locationclasspath:rabbitmq.properties/!-- 定义rabbitmq connectionFactory --rabbit:connection-factory idconnectionFactory host${rabbitmq.host}port${rabbitmq.port}username${rabbitmq.username}password${rabbitmq.password}virtual-host${rabbitmq.virtual-host}/context:component-scan base-packagecom.listener /!--定义监听器容器默认为自动确认acknowledgemanual手动签收--rabbit:listener-container connection-factoryconnectionFactory acknowledgemanual prefetch2!--rabbit:listener refackListener queue-namestest_queue_confirm/rabbit:listener--!--rabbit:listener refqosListener queue-namestest_queue_confirm/rabbit:listener--!--定义监听器监听正常的队列--!--rabbit:listener refdlxListener queue-namestest_queue_dlx/rabbit:listener--!--注意实现延迟队列功能时消费端监听的是死信队列--rabbit:listener reforderListener queue-namesorder_queue_dlx/rabbit:listener/rabbit:listener-container/beans消费端ConsumerTest
package com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;RunWith(SpringJUnit4ClassRunner.class)
ContextConfiguration(locations classpath:spring-rabbitmq-consumer.xml)
public class ConsumerTest {Testpublic void test(){while (true){}}
}7.日志与监控
1.RabbitMQ日志
RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbitxxx.log
日志包含了RabbitMQ的版本号Erlang的版本号RabbitMQ服务结点名称cookie的hash值RabbitMQ配置文件地址内存限制磁盘限制默认账户guest的创建以及权限配置等等
2.web管控台监控 8.消息追踪-Firehose
在使用任何消息中间件的过程中难免会出现某条消息异常丢失的情况。对于RabbitMQ而言可能是因为生产者或消费者与RabbitMQ断开了连接而它们与RabbitMQ又采用了不同的确认机制也有可能是因为交换器与队列之间不同的转发策略甚至是交换器并没有与任何队列进行绑定生产者又不感知或者没有采取相应的措施另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程以此协助开发和运维人员进行问题的定位
在RabbitMQ中可以使用 Firehose 和 rabbitmq_tracing插件 功能来实现消息追踪
firehose的机制是将生产者投递给rabbitmq的消息rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称分别对应生产者投递到exchange的消息和消费者从queue上获取的消息
注意 打开 trace 会影响消息写入功能适当打开后请关闭
开启Firehose命令
rabbitmqctl trace_on
关闭Firehose命令
rabbitmqctl trace_off