当前位置: 首页 > news >正文

北京市城乡建设网站手机百度账号申请注册

北京市城乡建设网站,手机百度账号申请注册,大型科技网站,图跃企业网站建设大纲 Pom.xml监听队列实时返回消息测试完整代码工程代码 在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。 webflux是反应式Web框架,客户端可以通过一个长连…

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId></dependency>

webflux的依赖如下:

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.6.7</version></dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Autowiredprivate ConsumerService comsumerService;@GetMapping(value = "/listen", produces = "text/event-stream")public Flux<String> listen(@RequestParam String queueName) {return comsumerService.listen(queueName);}
}
// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class ConsumerService {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock = new ReentrantLock();private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

http://www.yayakq.cn/news/558544/

相关文章:

  • 帮人做视频的网站西安网站开发的未来发展
  • 抖音开放平台橡塑app网站优化开发
  • 网站源码下载视频北京市网站公司网站
  • 旅游网站建设1000字网站主色调简介
  • 沧州地区做网站wordpress登陆页文件
  • 专门做照片书的网站深圳网站关键词优化公司哪家好
  • php网站后台密码忘记了怎么办做旅游网站的关注与回复
  • 网站建设定制价格明细表电子书网站 自己做
  • 济南外贸建站网络推广网站电话
  • 网站特色电子商务网站建设的主要风险
  • 大尺度做爰床视频网站微信网页上的网站怎么做的
  • 上海网站建设的网站灵台县门户网站
  • 宁乡市建设局网站苏州网络推广建网站
  • 怎么创建网站的快捷方式手机商城网站建设设计方案
  • 做电影网站有什么好处百度一下网址大全
  • 更新网站要怎么做呢有关网站建设新闻资讯
  • 淘宝做网站红色好看的网站
  • 如何入侵网站服务器郴州网络推广公司
  • 网站地图有什么作用wordpress产品展示
  • 怎样自己做一个网站欧洲美妇做爰网站
  • 湛江市建设规划局网站网站推广的基本方法为()
  • c 还可以做网站搭建网站需要什么
  • 大良营销网站建设效果优质的房产网站建设
  • 建一个网站素材哪里来站长之家ping检测
  • 工艺宣传网站建设移动端网站宽度做多大
  • 柯桥教育网站建设那个网站做苗木
  • 会计网站模板崆峒区城乡建设局网站
  • 租二级目录做网站网站开发支付宝支付
  • 怎样做企业手机网站首页上海企业登记在线官网
  • 嘉兴网站建设多少时间环保网站模版