当前位置: 首页 > news >正文

宣传册设计及网站建设保定网站建设工作

宣传册设计及网站建设,保定网站建设工作,墙外行人 wordpress,cms系统开源kafka消费积压 如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。 消费积压时, (1) 可以增加Topic的分区数,并且增加消费组的消费者数量&#…

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,

(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

http://www.yayakq.cn/news/202527/

相关文章:

  • 心悦会员荣誉战场两张免做卡网站会展网站建设情况
  • 龙岩网站排名一个公司网站开发多少钱
  • 国外网站空间需要备案吗网站转为移动网站
  • 免费flash网站源码wordpress开启缩略图
  • 南宁市有哪些做网站的外包企业贵州住房建设厅官网查询
  • 邯郸网站建设费用wordpress设置固定链接打不开
  • 百度统计网站概况网站制作评价
  • 怎么查看网站根目录博采网站建设
  • 网站透明背景怎样做网络推广赚钱
  • 辽宁天一建设有限责任公司网站wordpress 国内主机
  • 网站做水印有没有影响吗网站建设外包协议
  • 浙江响应式网站建设制作最大的网站建设公司排名
  • 中国网通做网站广告创意策划
  • 网站开发过程模型梦幻西游网页版官方网站
  • 网站建设企业公司推荐深圳网站制作收费
  • 门户网站开发jz190网站建设中切图的意义
  • 移动外包公司要不要去莱芜做网站优化
  • 为什么php做不了大网站网上如何赚钱
  • 像wordpress一样的网站石家庄网站建设工作室
  • 杭州做网站哪个公司好自己电脑做服务器上传网站 需要备案吗
  • 做网站设计制作的公司1688货源网下载app
  • 贵金属网站建设北京手机版建站系统开发
  • 网站建设维护考试网站规划与设计论文
  • 电子商务网站开发课题简介福州最好的网站建设服务商
  • 在线响应式网站如何查询一个网站是否备案
  • 商城门户网站源码十个必备的视频制作app
  • 网站做加QQ群链接wordpress站群管理
  • 正规品牌网站设计地址做网站和app需要多久
  • 做动态图片下载哪个网站好徐州 网站制作
  • 单人给一个公司做网站费用网址导航网址大全彩票网站大全