当前位置: 首页 > news >正文

常州知名网站新网网站管理

常州知名网站,新网网站管理,tk注册网站,公众号免费套用模板目录 减少发送mq的消息体内容 增加消费者数量 批量消费消息 临时队列转移 监控和预警机制 分阶段实施 最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容 像我们没有必要知道一个的中间状态,只需…

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5);        // 初始消费者数量factory.setMaxConcurrentConsumers(20);    // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());return factory;}
}
@Service
public class ConsumerManagerService {@Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container = registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer = (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}

批量消费消息

@Service
public class BatchMessageConsumer {@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")public void processBatch(List<Message> messages, Channel channel) {try {// 批量处理消息List<MessageDTO> dtos = messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
@Configuration
public class BatchConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}

临时队列转移

@Service
public class MessageTransferService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息List<Message> messages = new ArrayList<>();for (int i = 0; i < batchSize; i++) {Message message = rabbitTemplate.receive(sourceQueue);if (message == null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg -> rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
@Component
public class TempQueueConsumer {@RabbitListener(queues = "#{tempQueue.name}")public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}@Beanpublic Queue tempQueue() {return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);}
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName = "myQueue";// 获取队列信息Properties properties = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount = properties.getMessageCount();// 检查消息堆积if (messageCount > threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount = calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}

分阶段实施

@Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积,启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续,使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}

最后还有一个方法就是开启队列的懒加载

http://www.yayakq.cn/news/952618/

相关文章:

  • 青海做网站最好的公司dz多语言企业网站
  • .net网站开发面试淘宝网站策划怎么做
  • 广州做网站mxszpt设计公司logo图片
  • 网站开发可以用gif吗保定涿州网站建设
  • 网站开发海报中国建设门户网站
  • 绵阳网站建设价格做动态图网站违法吗
  • 饰品网站建设游戏推广赚钱
  • 网站素材 图标网站制作网站
  • 设计网站页面好处国际时事新闻
  • 美橙互联同类型网站世纪兴网站建设
  • 网站建设总体费用网上商城系统概述
  • 网站建设昆明包装设计广东省住房和城乡建设厅官网
  • 网站未授权cas要怎么做管理网站
  • 义乌个人兼职做建设网站WordPress评论第页
  • 网站开发语言总结有哪些最实用的仓库管理系统
  • 成都网站建设制作设计福建坤辕建设工程有限公司网站
  • 浅谈电子商务网站建设与规划珠海做企业网站多少钱
  • 网站建设标书样本太原在线制作网站
  • 妇产科网站建设我想建个网站
  • 建设网站都要学些什么手续在线做效果图有哪些网站有哪些
  • 阿里云添加网站原创软文
  • 提升型企业网络营销网站网络推广具体内容
  • 姑苏区网站建设wordpress後台建站
  • 楼盘 东莞网站建设html网站两边的浮窗怎么做
  • 两学一做 官方网站怎么制作网站镜像
  • wordpress preview.net网站做优化
  • 火狐浏览器网站开发人员网站编写
  • 网站做sem优化实体店引流推广方法
  • 阿尔及利亚网站后缀阜阳网站开发
  • 廊坊网站建设方案郑州最新政策