当前位置: 首页 > news >正文

网站建设 上海网站建设新型城镇化建设网站

网站建设 上海网站建设,新型城镇化建设网站,网站设计改版,深圳网站建设的价格在使用rocketmq过程中总能看见一下异常 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consu…

在使用rocketmq过程中总能看见一下异常

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

这是因为Rocketmq出发了流量控制。

 

触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consumer流控

1.Broker流控 

Rocketmq默认采取的是异步刷盘方式,Producer把消息发送到broker后,Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中

1.1 broker busy 

Broker是开启快速失败的,处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:

public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS);
}

 1.1.1 Page Cache繁忙

清理过期请求之前会先判断一下Page Cache是否繁忙,如果繁忙就会给Producer返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),那怎么判断Page Cache繁忙呢?

当Broker收到消息后,会放到Page Cache中,这个过程,首先会取一个CommitLog写入锁,如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。

1.1.2清理过期请求

清理过期请求时,如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")

1.2 system busy

这个异常在 NettyRemotingAbstract#processRequestCommand 方法.

1.拒绝请求

如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")

那什么情况下请求会被拒绝呢?看下面这段代码:

//SendMessageProcessor类
public boolean rejectRequest() {return this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。

注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题

2.线程拒绝

Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")

判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:

public boolean isOnewayRPC() {int bits = 1 << RPC_ONEWAY;return (this.flag & bits) == bits;
}

 1.3消息重试

Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:

//DefaultMQProducer类
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_ERROR,ResponseCode.NO_PERMISSION,ResponseCode.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT
));

2.Consumer流控

        DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。

2.1 缓存消息数量超过阈值

ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}

2.2缓存消息大小超过阈值

ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}

2.3缓存消息跨度超过阈值

对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:

if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}
}

2.4获取锁失败

对于顺序消费的情况,ProcessQueue加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。

3.总结

本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。

Broker4种场景:

page cache 繁忙:获取commitlog写入锁超过1s 

清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms 

请求拒绝  一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer

线程池拒绝 Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000

Consumer4种场景

消息数量超过阈值1000

消息大小超过阈值100m

缓存消息跨度超过阈值 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000

ProcessQueue加锁失败 也会延迟加载

http://www.yayakq.cn/news/519032/

相关文章:

  • 公司静态网站模板一个人免费观看视频在线中文
  • 湖北网站建设软件有哪些wordpress建站版本推荐
  • 阿里巴巴怎么建设网站首页快速做网站套餐
  • 北京宏福建设有限公司网站判断网站是否被k
  • 莱州网站建设哪家好物联网就业方向
  • seo网站排名优化软件5000人朋友圈推广多少钱
  • 网站建设套餐介绍个人做网站能备案吗
  • 交友类网站功能建设思路苏州网络推广seo服务
  • 登陆不了建设银行网站静态网站开发考虑什么
  • 义乌网站推动高质量发展为主题
  • 设计的很好的网站网站建设基础课程
  • 教育教学成果展示网站建设网站建设需要方案
  • 建设美食电子商务网站怎么样让公司网站
  • 做东西的网站有那些网站标题导航栏
  • 企业网站管理源码手表网站
  • 对电子商务网站设计的理解视频8首页制作代码
  • 南京明月建设集团网站如何快速构建一个网站
  • 行业网站建设方式有哪些海口企业网站建设制作哪家专业
  • 东莞物流网站设计公司做电影网站要不要收费
  • 怎么做网站的主页面织梦转WordPress插件
  • 网站建设导航分哪几类金融企业网站制作
  • 旅游网站策划营销世界工厂网app
  • apache 多网站wordpress怎么改搜索
  • 帝国 网站搬家vs2013 网站建设
  • 网站开发公司人员配置上海网站建设公司网
  • 网站建设公司 广告法被处罚上国外网站用什么dns
  • 贵州网站建设gzzctyi网站seo优化技术入门
  • p2p的网站开发做外贸的女生现状
  • 网站建设设计 昆山室内设计学校环境分析
  • 初中上哪个网站找题做网站建设难么