当前位置: 首页 > news >正文

大朗镇住房规划建设局网站青岛城阳网站设计

大朗镇住房规划建设局网站,青岛城阳网站设计,国内精美网站界面网址,电子商务网站规划与...文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能,但有⼀定的限制: topic 相同 waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}
http://www.yayakq.cn/news/427394/

相关文章:

  • 网站建设求职要求天津网站建设咨询
  • 搜狗站长工具综合查询wordpress编辑器哪个好
  • 网站注册模板产品小程序如何制作
  • 医疗网站建设好么电商到底干嘛的
  • 正鹏建设工程有限公司网站重庆市建设工程质量网站
  • 网站图片有什么要求吗云南建网科技有限公司
  • 购买网站域名空间wordpress发送邮件功能未启用
  • 兰州城关区建设局网站开发什么软件有市场
  • 建设部网站工程设计收费标准深圳网站设计公司哪家便宜
  • 网站建设销售技巧和话术页面素材图片
  • 网站建设官方网自己网站可以加标志吗
  • 专门做qq小工具的网站wordpress设置主页面
  • h5网站模板源码小程序开发适合的应用
  • 什么做网站推广产品网络推广方式
  • 企业建设企业网站的好处有哪些网店运营具体做什么
  • 相册网站开发贪便宜网站
  • 天津网站公司Wordpress如何加联盟广告
  • 网站导航仿站做电商设计有什么好的网站推荐
  • 网站建设设计企业提升网站知名度
  • 网站建设专业术语淮安网站设计
  • 免费行情软件网站mnw数据平台
  • 镇江网站建设top百度seo关键词优化软件
  • 网站启用cdn加速怎么在百度上能搜到自己的网站
  • 新闻资讯型网站开发discuz 旅游网站模版
  • 廊坊网站建设制作网站开发任务书
  • 网站程序备份方法wordpress 多域名绑定
  • 功能网站网站服务器ipv6
  • 卫浴网站建设大连建站方案
  • 聊城集团网站建设公司昆明建站公司推荐
  • 中石油第六建设公司网站腾讯广告投放端提供的建站工具有