php+mysql 2012也买酒商城网站源码,做空的网站有哪些,制作会员手机网站,新闻内容摘抄目录 一、Rabbitmq介绍
二、Rabbitmq的使用场景
1、异步处理
2、服务解耦
3、流量削峰
4、日志收集
5、发布订阅
6、任务调度
三、python如何使用Rabbitmq
1、安装依赖
2、基础使用
3、消息确认
4、消息持久化
5、公平调度
6、发布订阅
7、关键字发布 一、Rabbi…目录 一、Rabbitmq介绍
二、Rabbitmq的使用场景
1、异步处理
2、服务解耦
3、流量削峰
4、日志收集
5、发布订阅
6、任务调度
三、python如何使用Rabbitmq
1、安装依赖
2、基础使用
3、消息确认
4、消息持久化
5、公平调度
6、发布订阅
7、关键字发布 一、Rabbitmq介绍
RabbitMQ是一个开源的消息中间件基于AMQPAdvanced Message Queue Protocol高级消息队列协议协议实现。RabbitMQ被广泛应用于各种应用场景如异步任务处理、日志传输、实时消息推送等。在微服务架构中RabbitMQ是一个常见的消息中间件选择它可以帮助服务之间实现解耦和异步通信提高系统的可扩展性和稳定性。RabbitMQ提供了一个简单的用户页面用户可以监控和管理消息、队列、交换器、绑定等资源。通过管理界面用户可以直观地了解系统的运行状态并进行相应的配置和管理操作。
二、Rabbitmq的使用场景
1、异步处理
在Web应用中当用户提交表单时可以将表单处理任务发送给RabbitMQ由后台服务异步处理从而提高用户界面的响应速度。在电商系统中用户下单后订单处理、库存更新、支付通知等任务可以异步执行避免阻塞主线程。
2、服务解耦
在微服务架构中不同服务之间通过RabbitMQ进行通信可以降低服务之间的耦合度提高系统的可扩展性和可维护性。当某个服务需要升级或维护时可以通过RabbitMQ实现服务的平滑过渡而不会影响其他服务的正常运行。
3、流量削峰
在高并发场景中RabbitMQ可以作为一个缓冲层接收并存储大量的请求然后按照设定的速率将请求转发给后端服务从而避免后端服务因过载而崩溃。通过RabbitMQ的限流和队列机制可以有效地控制请求的速率和数量保护后端服务的稳定性。
4、日志收集
RabbitMQ可以用于收集分散在各个服务器上的日志信息将它们集中到一个或多个日志处理服务中进行统一的分析和处理。通过RabbitMQ可以实现日志的实时收集、分析和报警提高系统的运维效率和故障排查能力。
5、发布订阅
在需要向多个客户端推送消息的场景中如实时通知、消息推送等可以使用RabbitMQ的Fanout交换器将消息广播给所有绑定的队列。通过RabbitMQ的消息广播机制可以实现实时、可靠的消息推送服务提高用户体验。
6、任务调度
RabbitMQ可以与其他任务调度框架如Quartz结合使用实现定时任务、延迟任务等复杂任务调度需求。通过RabbitMQ的任务调度功能可以灵活地控制任务的执行时间和频率提高系统的自动化程度和运行效率。
三、python如何使用Rabbitmq
Rabbitmq网址https://www.rabbitmq.com/tutorials
1、安装依赖
安装第三方库pikapip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika
2、基础使用
生产者模型代码如下
import pika# 生产者模型connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接本地的rabbitmq
channel connection.channel() # 连接通道
channel.queue_declare(queuelist) # 创建一个名叫list的队列
channel.basic_publish(exchange,routing_keylist, # 向那个队列发布信息bodylol,# 发布的信息)
connection.close() # 关闭连接
消费者代码如下
import pika# 消费者模型connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接本地的rabbitmq
channel connection.channel() # 连接通道
channel.queue_declare(queuelist) # 为确保队列存在创建一个list队列def callback(ch, method, properties, body):print(f [x] 消费者接收到了任务 {body})channel.basic_consume(queuelist,on_message_callbackcallback,auto_ackTrue) # 接收信息queue表示监听的队列on_message_callback表示接收到信息执行的函数auto_ack表示默认执行回复
channel.start_consuming() # 开启永无止境的循环监听该队列
3、消息确认
在队列中执行任务可能需要几秒钟您可能想知道如果 使用者启动一个长任务并在完成之前终止。 使用我们当前的代码一旦 RabbitMQ 将消息传递给消费者它就会 立即将其标记为删除。在这种情况下如果您终止 worker则它刚刚处理的消息丢失了。调度的消息 对于这个尚未处理的特定 worker 来说也会丢失。为了确保消息永远不会丢失RabbitMQ 支持消息确认。ack由 consumer 告诉 RabbitMQ 已收到特定消息 处理并且 RabbitMQ 可以自由删除它。
这是由消费者来进行改变的代码如下
需要将auto_ack改为False然后在回调函数里加入ch.basic_ack(delivery_tagmethod.delivery_tag)
import pika# 消费者模型connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接本地的rabbitmq
channel connection.channel() # 连接通道
channel.queue_declare(queuelist) # 创建一个队列def callback(ch, method, properties, body):print(f [x] 消费者接收到了任务 {body})ch.basic_ack(delivery_tagmethod.delivery_tag) # 给队列回复进行确认channel.basic_consume(queuelist2,on_message_callbackcallback,auto_ackFalse) # auto_ack改为False
channel.start_consuming() # 开启永无止境的循环监听该队列
4、消息持久化
如果 RabbitMQ 服务器 停止。
当 RabbitMQ 退出或崩溃时它会忘记队列和消息 除非你告诉它不要这样做。需要做两件事来确保 消息不会丢失我们需要将队列和消息都标记为 耐用。然和把队列和消息保存在磁盘里。
这是有生产者来进行改变的代码如下
durableTrue在创建队列时声明持久化delivery_modepika.DeliveryMode.Persistent让信息做持久化
import pika# 生产者模型connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接本地的rabbitmq
channel connection.channel() # 连接通道
channel.queue_declare(queuelist2,durableTrue) # 创建一个队列,durableTrue表示队列支持持久化
channel.basic_publish(exchange,routing_keylist2, # 向那个队列发布信息bodylol,# 发布的信息propertiespika.BasicProperties(delivery_modepika.DeliveryMode.Persistent # 让信息做持久化))
connection.close() # 关闭连接
5、公平调度
您可能已经注意到调度仍然没有完全工作 正如我们想要的那样。例如在有两个 worker 的情况下当所有 奇数消息很重偶数消息很轻一个 worker 将是 一直很忙另一个几乎不做任何工作。井 RabbitMQ 对此一无所知仍会 dispatch 消息均匀。
发生这种情况是因为 RabbitMQ 只是在消息 进入队列。它不看未确认的数量 消息。它只是盲目地调度每 n 条消息 到第 n 个消费者。
为了解决这个问题我们可以使用带有 setup 的 channel 方法。它使用协议方法告诉 RabbitMQ 不要一次向 worker 提供多条消息。或者换句话说不要调度 向 worker 发送新消息直到它处理并确认 上一个。相反它会将其分派给下一个仍然不忙的 worker。
这是由消费者来进行改变的代码如下
channel.basic_qos(prefetch_count1)进行闲置派发
import pika# 消费者模型connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接本地的rabbitmq
channel connection.channel() # 连接通道
channel.queue_declare(queuelist) # 为确保队列存在创建一个list队列def callback(ch, method, properties, body):print(f [x] 消费者接收到了任务 {body})channel.basic_qos(prefetch_count1) # 闲置派发
channel.basic_consume(queuelist,on_message_callbackcallback,auto_ackTrue) # 接收信息queue表示监听的队列on_message_callback表示接收到信息执行的函数auto_ack表示默认执行回复
channel.start_consuming() # 开启永无止境的循环监听该队列
6、发布订阅
发布订阅模式是将信息发布给所有的订阅者其特点就是有交换机。
channel.exchange_declare(exchangem1,exchange_typefanout)声明一个交换机类型为fanout是将信息发给所有的订阅者
发布者代码如下
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接rabbitmq
channel connection.channel() # 连接通道channel.exchange_declare(exchangem1,exchange_typefanout) # fanout将信息发给所有的队列
channel.queue_declare(queue) # 创建一个队列,durableTrue表示队列支持持久化
channel.basic_publish(exchangem1,routing_key, # 队列名称bodylol,)# 发布的数据
connection.close() # 关闭连接
订阅者代码如下
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接rabbitmq
channel connection.channel()channel.exchange_declare(exchangem1,exchange_typefanout,) # fanout将信息发给所有的队列
resultchannel.queue_declare(exclusiveTrue,queue) # 创建一个随机队列
queue_nameresult.method.queue # 拿到队列名字
print(queue_name)
channel.queue_bind(exchangem1,queuequeue_name) # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f [x] 消费者接收到了任务 {body})channel.basic_consume(queuequeue_name,on_message_callbackcallback,auto_ackTrue)
channel.start_consuming() # 开启永无止境的循环监听该队列
7、关键字发布
关键字发布就是在发布订阅模式基础上将不同信息发布给不同的订阅者。
channel.exchange_declare(exchangem2,exchange_typedirect) 声明一个交换机交换机的类型为direct将信息发布给指定的订阅者。
发布者代码如下
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接rabbitmq
channel connection.channel() # 连接通道channel.exchange_declare(exchangem2,exchange_typedirect) # direct将信息发布给指定的订阅者
channel.queue_declare(queue) # 创建一个队列,durableTrue表示队列支持持久化
channel.basic_publish(exchangem2,routing_keyhhq, # 队列名称bodycf,)# 发布的数据
connection.close() # 关闭连接
订阅者代码如下
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(localhost)) # 连接rabbitmq
channel connection.channel()channel.exchange_declare(exchangem2,exchange_typedirect) # exchange:交易所的名称fanout将信息发给所有的队列
resultchannel.queue_declare(exclusiveTrue,queue) # 创建一个随机队列
queue_nameresult.method.queue # 拿到队列名字
print(queue_name)
channel.queue_bind(exchangem2,queuequeue_name,routing_keyhhq) # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f [x] 消费者接收到了任务 {body})channel.basic_consume(queuequeue_name,on_message_callbackcallback,auto_ackTrue)
channel.start_consuming() # 开启永无止境的循环监听该队列