当前位置: 首页 > news >正文

电子商务中网站建设wordpress 远程调用

电子商务中网站建设,wordpress 远程调用,网站建设企业网站界面设计,网站建设制作费用预算表关于RabbitMQ你了解多少? 文章目录 关于RabbitMQ你了解多少?基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性…

关于RabbitMQ你了解多少?

文章目录

  • 关于RabbitMQ你了解多少?
    • 基础篇
      • 同步和异步
      • MQ技术选型
      • 介绍和安装
      • 数据隔离
      • SpringAMQP
      • 快速入门
      • Work queues
      • 交换机
        • Fanout交换机
        • Direct交换机
        • Topic交换机
      • 声明队列和交换机
      • MQ消息转换器
    • 高级篇
      • 消息可靠性问题
      • 发送者的可靠性
        • 生产者重连
        • 生产者确认
        • SpringAMQP实现生产者确认

RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件

基础篇

同步和异步

同步调用:

  • 优势:时效性强,等待到结果后才返回
  • 不足:拓展性差、性能下降、级联失败问题

异步调用:

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色
    • 消息发送者:投递消息的人,就是原来的调用方
    • 消息代理:管理、暂存、转发消息
    • 消息接收者:接收和处理消息的人,就是原来的服务提供方
  • 优势:解除耦合,拓展性强、无需等待,性能好、故障隔离、缓存消息,流量削峰填谷
  • 不足:不能立刻得到调用结果,时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性

MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

介绍和安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址

同样基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

  • 安装完成后访问管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:

    在这里插入图片描述

RabbitMQ对应的整体架构核心概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列,无存储能力
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

在这里插入图片描述

数据隔离

  • 每个Virtual Host提供了一个独立的消息代理,它们之间完全隔离,相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host,可以实现数据的隔离。
  • 每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样,消息只能在同一个Virtual Host内进行路由和传递,不会跨越Virtual Host。
  • 通过使用Virtual Hosts,可以将不同的应用程序或服务的消息进行逻辑隔离,确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用,可以确保不同的租户之间的消息数据完全隔离,提高安全性和隐私性。

SpringAMQP

SpringAMQP的官方地址

  • AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
  • Spring AMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

快速入门

  1. 引入spring-amqp依赖,这样publisher和consumer服务都可以使用:

    <!——AMQP依赖,包含RabbitMQ-->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 配置RabbitMQ服务端信息

    spring:rabbitmq:host: 192.168.100.101 #主机名port: 5672 # 端口virtual-host: /liner #虚拟主机username: admin #用户名password: admin123 #密码
    
  3. 发送消息:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {//队列名称String queueName = "simple.queue";//消息String message = "hello,spring amqp!";//发送消息rabbitTemplate.convertAndSend(queueName,message);
    }
    
  4. 接收消息:SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法

    @slf4j
    @Component
    public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring消费者接收到消息:【"+ msg + "】");if (true) {throw new MessageconversionException("故意的");}log.info("消息处理完成");}
    }
    

Work queues

Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息

publisher
queue
consumer1
consumer2

消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息

spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题
publisher
exchange
queue1
queue2
consumer1
consumer2
consumer3

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
Fanout交换机

Fanout Exchange:会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

在这里插入图片描述

Direct交换机

Direct Exchange :会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

Topic交换机

TopicExchange:与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 “.” 分割。

Queue与Exchange指定BIndingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

在这里插入图片描述

声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

在这里插入图片描述

@Configuration
public class FanoutConfiguration{@Beanpublic FanoutExchange fanoutExchange(){//ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("liner.fanout");}@Beanpublic Queue fanoutQueue(){//QueueBuilder.durable("").build();return new Queue("fanout.queue");}@Beanpublic Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}

SpringAMQP还提供了基于 @RabbitListener 注解来声明队列和交换机的方式:

@RabbitListener(bindings = @QueueBinding(value = Queue (name = "direct.queue",durable = "true"),exchange = @Exchange(name = "liner.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue(String msg){System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}

MQ消息转换器

Spring的消息发送代码接收的消息体是一个Object:在这里插入图片描述

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题:数据体积过大、有安全漏洞、可读性差

在这里插入图片描述

因此建议采用JSON序列化代替默认的JDK序列化

  • publisherconsumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

  • 配置消息转换器,在publisherconsumer两个服务的启动类中添加Bean:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断

在这里插入图片描述

高级篇

消息可靠性问题

发送者的可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 #最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

RabbitMQ有Publisher ConfirmPublisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
SpringAMQP实现生产者确认
  1. 在publisher这个微服务的application.yml中添加配置:

    spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true #开启publisher return机制#这里publisher-confirm-type有三种模式可选:
    #  none:  关闭confirm机制
    #  simple:  同步阻塞等待MQ的回执消息
    #  correlated:  MQ异步回调方式返回回执消息
    
  2. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {log.info("消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{}",replyCode,replyText,exchange,routingKey,message.toString());});}
    }
    
  3. 发送消息,指定消息ID、消息ConfirmCallback

    @Test
    void testPublisherConfirm() throws InterruptedException {//1.创建CorrelationDataCorrelationData cd = new CorrelationData();//2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){	 // result.isAck(), boolean类型, true代表ack回执,false代表nack回执log.debug("发送消息成功,收到ack!");}else{	// result.getReason(), String类型,返回nack时的异常描述log.error("发送消息失败,收到nack,reason:{}",result.getReason());}}});//3.发送消息rabbitTemplate.convertAndSend("liner.direct","red","hello",cd);
    }
    
http://www.yayakq.cn/news/335529/

相关文章:

  • 做网站页面文件一般网站建设公司有哪些
  • 缝纫网站做洗衣机罩佛山网站设计制作免费咨询
  • 常德做网站多少钱wordpress学校官网
  • wordpress 漫画站邢台网站建设哪家好
  • 孟村网站建设价格合肥网站建设讯息
  • 免费直链平台烟台网站建设seo
  • 廊坊网站建设技术外包wordpress polylang
  • 宁波自助建站公司关键词seo优化服务
  • 石家庄企业网站怎么写自己的网页
  • 网站建设980元网站建设ppt方案模板下载
  • 怎样改网站英文域名WordPress非首页输出文章
  • 太原自助建站软件校友网站 建设
  • 网络彩票网站建设多少钱微信导航网站怎么做
  • 北京朝阳建站优化做水军那些网站好
  • 重庆建站模板厂家青岛网站建设公司报价
  • 苏宁易购的网站建设怎样用自己的服务器做网站
  • 企业网站 asp源码企业网站实名认证时间
  • 网站里面嵌入的地图是怎么做的软文有哪些推广渠道
  • 宁阳网站设计东乌珠穆沁旗网站建设
  • 花店网站模板 html免费企业黄页
  • 网站建设需要的技能有哪些wordpress 导航
  • 手机网站图片做多大南昌网站维护
  • 网站代理备案表如何做网络推广工作
  • 浮雕模东莞网站建设建材手机网站
  • 网站百科源码检察院网站建设方案
  • 内蒙古建网站怎么做晒鱼的网站
  • 北京大型网站开发宜兴建设局 审图中心 网站
  • 网站外链分析怎么做wordpress设置本地盘
  • 青岛网页设计公司报价单seo排名优化服务
  • 量力商务大厦网站建设2022西安最新出入通知