当前位置: 首页 > news >正文

网站开发实践研究报告中国建设教育业协会网站

网站开发实践研究报告,中国建设教育业协会网站,深圳做网站-龙华信科,做设计常用的网站保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。 1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制; 发送者重试机制,其实就是配置…

保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。

1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制;

发送者重试机制,其实就是配置文件配置重试规则,当消息发送失败后,会根据配置的重试次数,进行多次发送重试,如代码:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

发送者确认机制:则是依赖于消息的回执,这其中包括发送者回执,和消费者回执两种,但是这种回执都比较耗性能,会导致消息消费的很慢。并且,这也是需要在配置文件中做配置的:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

并且还要有代码的实现,这种方式极大的影响了性能,:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

2.MQ自身的可靠性:交换机/队列/消息都实现持久化,消息不会丢失,如果是在项目中通过代码创建的交换机/队列/消息,spring默认就是持久化的,如果在mq的客户端手工配置,那就要选定各个参数了。持久化后的消息会直接进入磁盘,不在经过内存了,正常来讲有IO的操作会慢才对,但是在实际的操作中却是非常快。

MQ队列最怕的就是消息积压,导致内存溢出。在3.12版本以后,MQ直接默认就是Laz懒惰队列的模式了,这个模式会直接加载到磁盘,当用到消息的时候,会从磁盘加载到内存,磁盘空间很大,支持数百万级别的存储,所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列,也可以在代码中直接实现,代码如下:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

3.消费者的可靠性:

3.1消费者消费消息后,向MQ发送回执,让MQ知道消息是否正常被消费了,目前回执有三种:

ack:成功处理了消息,MQ从队列中就会删除消息,正常。

nack:失败处理了消息,MQ需要再次投递消息,这会出现一直重试的问题。

reject:消息失败,并拒绝了消息,并且从队列中删除了消息。这个消息被删除了,岂不是数据就丢失了。

对于以上三种回执,基本回执都是固定的,AMQP提供了消息确认的方式,不用写代码,配置就可以,配置有三种:none-配置它失败了,消息会被删除,auto-失败了,消息会回到MQ重新投递,不会丢失,不会被删除,manual-太麻烦,算了。

不过,对于auto的配置,对于返回的异常,会有两种判断:1,如果是业务异常,会自动返回nack

如果是消息处理或者校验异常,会直接进行reject

spring:rabbitmq:listener:simple:acknowledge-mode: auto

3.2 生产者有重试机制,消费者也有重试机制,但是,对于消费者的重试,如果一直失败,那就要有一定的策略,可以把这个失败的消息放到另一个交换机上,后续人工进行干预,这样可以保证消息不丢失。

对于消费者的重试配置:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

如何把消息发送到另一个交换机上呢?

在消费者服务定义一个处理失败消息队列的交换机,这样就可以把消息存储过去了

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.业务的幂等性判断

4.1.对于一条MQ消息,为了防止被重复消费,可以做一个唯一的msgID,当消费的时候可以先检查下这个ID,如果已经消费过了,那就不能再消费了,一定程度上可以避免被重复消费,代码如下:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

4.2.还有一种保证消费唯一性的就是业务上的判断,当需要消费消息的时候,可以先提前去查询下需要消费消息的状态,如果状态已经发生了改变,自然也不用再去消费这条消息了

5.对于以上所有保证消息可靠性的方案,其实都不能完全保证,最终需要一个兜底的方案,兜底方案我们可以采取一个定时任务的方式,定时轮询检查消息是否消费。比如时间间隔多少秒进行一次轮询检查,这种方式我们可以理解为主动查询。这种兜底很大程度上可以保证业务上的一致性。

http://www.yayakq.cn/news/96322/

相关文章:

  • 深圳建立网站的公司山东住房建设厅官网站首页
  • 网站管理员怎样管理自己有网站怎么做点卡
  • 北京网站关键词排名营销手段有哪些
  • 建筑贴图素材网站公司制作网站流程
  • 做企业网站需要多久免费wordpress主题内容怎么改
  • 怎么在自己电脑上搭建网站上海app外包公司
  • 如何把网站做成app导游网站如何建设的
  • 临沭做网站温州seo按天扣费
  • 做网站好的网站建设公司哪家好app开发制作网站平台
  • 网站做301重定向的作用dede网站建站教程
  • 继续浏览此网站(不推荐)云开发壁纸小程序
  • 工艺品做网站怎么制作微信公众号文章
  • 网站板块设置附近网站建设服务公司
  • 陇南市响应式网站建设北京网站建设亿玛酷出名5
  • 网站外链建设分析嘉兴网站排名优化
  • 电影资源网站开发网站建设的专业知识
  • 敦煌网网站推广方式佛山洛可可设计公司
  • 交易平台网站程序dedecms生成xml网站地图
  • 连锁餐饮网站建设如何维护公司网页
  • 盐城网站建设服务汕头最新新闻消息
  • 网站建设有关的职位wordpress一键登录
  • 网站页面设计尺寸天津市建设工程协会网站
  • 系统开发岗位职责无锡seo网站排名优化
  • 网站设计怎么算侵权徐州做网站谁家最专业
  • 什么网站能接单做网站重庆网站建设报价
  • 深圳南山区住房和建设局网站莱芜买房网站
  • 个人网站备案需要什么资料网站建设的税率
  • 大数据网站建设和优必选网站
  • 郑州网页网站制作泰州网站建设方案
  • 学校网站首页设计图片自己在线制作logo免费模板