当前位置: 首页 > news >正文

国家现代农业示范区建设网站西安建设企业网站

国家现代农业示范区建设网站,西安建设企业网站,wordpress首页文章截取,网站布局优化策略造成重复消费的原因: MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消…

造成重复消费的原因:

MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消费者再次推送该条message,这样就造成了重复消费。

解决重复消费的办法:

用存储(redis或者mysql)记录一下已经消费的message的id,当message被消费前先去存储中查一下消费记录,没有该条message的id则正常消费返回ack,有该条message的id的话不用消费直接返回ack给MQ。

当然实际生产中的话选用redis是比较好的选择,毕竟查mysql要进行磁盘IO,效率要低得多,而且绝大多数重复消费都是由于MQ没有收到消费者的ack于是造成MQ再次向消费者进行同一条message的投递。所以message的消费记录其实我们并不需要一直记录,只需要保存一段时间,当下次投递过来的时候消费者能查到消费记录然后准确返回ack给MQ就行。

yml

#配置rabbitMq 服务器rabbitmq:host: xxxx#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口port: xxxxusername: xxxxpassword: xxxx#虚拟host 可以不设置,使用server默认hostvirtual-host: xxxxconnection-timeout: 0#确认消息已发送到队列(Queue)publisher-returns: true #确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 设置消费端手动 acklistener:simple:retry:# 开启消费者(程序出现异常的情况下)进行重试enabled: true#重试间隔时间max-interval: 1000# 最大重试次数max-attempts: 3#开启手动确认消息acknowledge-mode: manual

监听类
 

package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel;
import com.rabbitmqprovider.commons.CommonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重复消费*/
@Slf4j
@Service
public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以写在类、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判断messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}else{//如果要防止 重复消费,则需要将 id值存在 redis,每次 都要去redis中拿id比对,是否存在,存在则消费过->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判断Key是否存在* @param messageId 唯一表示key* @param msg       value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判断Key是否存在 有则返回true,没有则返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}
}

------------------------------------------controller--------------------------------------------------

/*** 解决重复消费问题*/
@GetMapping("/sendMessageTestOnly")
public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
}

---------------------------------回调------------------------------------------------------

package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 当消息由生产者发到交换机后会回调该接口中的confirm方法*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败* cause 表示失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交换机收到消息 消息内容为{}->", correlationData);}else {log.info("交换机未收到消息消息内容为{}, 原因为{}->", correlationData, cause);}}}

-----------------------------------------------------------------------------------------------------------------

执行顺序时:先发送消息;然后在接收消息,并判断消息是否重复,如果不重复 则回复消息,否则 拒绝回复;最后回调。

http://www.yayakq.cn/news/147435/

相关文章:

  • 做网站销售那里找客户nonce验证 wordpress
  • 网站建设 博采网络 学校android studio汉化
  • 政务网站建设和技术维护制度wordpress改logo不显示
  • wapcms建站系统中国最好的建站公司
  • 个人网站做企业备案灰色关键词网站建设
  • 网站制作教程 百度文库百度推广费用多少
  • 3.0效果网站建设多少钱下瓦房做网站公司
  • 网站建设1000字苏州网站建设选苏州梦易行
  • 金华住房和城乡建设部网站高新区网站建设 意义
  • 百度收录入口在哪里广州seo网站设计
  • 行政机关单位网站建设要求网站建设论文总结
  • 电商公司建设网站黄山网站建设找哪家
  • 湘潭网站建设的公司asp网站开发全程视频
  • 合肥网站建设哪家好价格静态网站的建设
  • 兰州网站seo按天计费软文世界平台
  • 快速建站平台成都新冠病最新消息
  • 襄阳市建设工程质量监督站网站网站首页qq在线咨询js
  • 如何做国外网站一小时学会网站建设
  • 国外h5建站25转行做网站运营
  • 如何做类似优酷的视频网站成都医疗seo整站优化
  • 怎么把自己做的网站发到网上通过网址来查看网站建设外包公司排名
  • 自己做网站成本wordpress 的分享插件下载地址
  • 响应式网站 英语wordpress主题底部
  • 网站开发 外包公司德州手机网站建设
  • 学网站开发难吗做个简单的企业小网站
  • 网站标题 关键字南昌企业做网站
  • 好网站设计公司酒店网站建设案例策划书怎么写
  • 网站的架构与建设私人做网站图片
  • 哪个网站音乐做的最好汕头电商网站建设
  • 信息化建设网站网站建设专员工作总结