设计师找图网站荆州哪里做网站
前言
消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?
- 发送时丢失: 
- 生产者发送的消息未送达 exchange
 - 消息到达 exchange 后未到达 queue
 
 - MQ 宕机,queue将消息丢失
 - consumer 接收到消息后未消费就宕机

消息可靠性问题及其对应的解决方案: 
| 场景 | publisher发送时丢失 | MQ消息丢失 | consumer消费问题 | 
|---|---|---|---|
| 解决方案 | 生产者确认机制 | 消息持久化 | 消费者消息确认&&失败重试机制 | 
下面我们先说一下publisher 发送时丢失的问题应该如何处理
生产者确认机制的理论说明
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publish-confirm, 发送者确认 
- 消息成功投递到交换机,返回ack
 - 消息未投递到交换机,返回nack
 
 - publish-return, 发送回执 
- 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因
 
 
注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突

代码实现
下面基于SpringAMQP 实现的生产者确认机制
- 在 publisher 服务的 application,yml 中添加以下配置:
 
spring:rabbitmq:publisher-confirm-type: correlated # 开启异步回调publisher-returns: truetemplate:mandatory: true
 
配置说明:
- publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型: 
- simple: 同步等待 confirm 结果, 直到超时
 - correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
 
 - publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
 - template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息
 
ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。
- 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
(这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作) 
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {// 记录日志log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",replayCode, replayText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}}
}	
 
- 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
 
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1. 准备消息String message = "hello, spring amqp!";// 2. 准备CorrelationData// 2.1 消息idCorrelationData correlationData = newCorrelationData(UUID.randomUUID().toString());// 2.2 准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm -> {// 判断结果if(confirm.isAck()){// ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, throwable -> {// 记录日志log.error("消息发送失败", throwable);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}
 
总结
SpringAMQP 中处理消息确认的几种情况:
- publisher-confirm: 
- 消息发送到 exchange, 返回 ack
 - 消息发送失败,没有到达交换机,返回 nack
 - 消息发送过程中出现异常,没有收到回执
 
 - 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback
 
