网站建设及编辑岗位职责十大外包公司
1 pipeline操作 Redis数据库
Redis 的 C/S 架构:
- 基于客户端-服务端模型以及请求/响应协议的 TCP服务。
 - 客户端向服务端发送⼀个查询请求,并监听Socket返回。
 - 通常是以 阻塞模式,等待服务端响应。
 - 服务端处理命令,并将结果返回给客户端。
 
存在的问题:
- 如果Redis服务端 同时处理多个请求,加上⽹络延迟,那么服务端利⽤率不⾼,效率降低。
 
解决的办法:
- 管道pipeline
 
1.1 pipeline的介绍
管道pipeline
- 可以⼀次性发送多条命令并在执⾏完后⼀次性将结果返回。
 - pipeline通过 减少客户端与Redis的通信次数 来实现降低往返延时时间。
 
实现的原理
- 实现的原理是 队列。
 - Client可以将三个命令 放到⼀个tcp报⽂⼀起发送。
 - Server则可以 将三条命令的处理结果放到⼀个tcp报⽂ 返回。
 - 队列是先进先出,这样就保证数据的顺序性。
 
1.2 pipeline操作Redis数据库
1.2.1 实现步骤
1. 创建Redis管道
2. 将Redis请求添加到队列
3. 执⾏请求
1.2.2 代码实现
# 创建Redis管道
pl = redis_conn.pipeline()
# 将Redis请求添加到队列
pl.setex('sms_%s' % phone, 60, smscode)
pl.setex('is_send_%s' % phone, 60, 1)
# 执⾏请求
pl.execute() 
 
2 生产者消费者设计模式
存在的问题:

性能优化:

思考:如何 将发送短信从主业务中解耦出来。

⽣产者消费者设计模式介绍
- 为了将发送短信从主业务中解耦出来,我们 引⼊⽣产者消费者设计模式。
 - 它是最常⽤的解耦⽅式之⼀,寻找中间⼈(broker)搭桥,保证两个业务没有直接关联。
 
总结:
- ⽣产者⽣成消息,缓存到消息队列 中,消费者读取消息队列中的消息并执⾏。
 - 由芒果头条⽣成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执⾏。
 
3 RabbitMQ介绍和使用
3.1 RabbitMQ介绍
- 开源AMQP实现,Erlang 语⾔编写,⽀持多种客户端
 - 分布式、⾼可⽤、持久化、可靠、安全
 - ⽀持多种协议:AMQP、STOMP、MQTT、HTTP
 - 适⽤于多系统之间 的业务解耦的消息中间件
 

3.2 消息队列 选择建议
3.2.1 Kafka
Kafka主要特点是基于Pull的模式来处理消息消费,追求⾼吞吐量,⼀开始的⽬的就 是⽤于⽇志收集和传输,适合产⽣⼤量数据的互联⽹服务的数据收集业务。
⼤型公司建议可以选⽤,如果有⽇志采集功能,肯定是⾸选kafka 了。
3.2.2 RocketMQ
天⽣为⾦融互联⽹领域⽽⽣,对于可靠性要求很⾼的场景,尤其是电商⾥⾯的订单 扣款,以及业务削峰,在⼤量交易涌⼊时,后端可能⽆法及时处理的情况。
RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿⾥双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。
3.2.3 RabbitMQ
RabbitMQ: 结合erlang语⾔本身的并发优势,性能较好,社区活跃度也⽐较⾼,但是 不利于做⼆次开发和维护。不过,RabbitMQ的社区⼗分活跃,可以解决开发过程 中遇到的bug。
如果你的数据量没有那么⼤,⼩公司优先选择功能⽐较完备的RabbitMQ。
3.3 安装RabbitMQ(ubuntu 18.04)
安装⽅式1(推荐):
- 安装Erlang 参考安装Erlang版本
 - 安装RabbitMQ 参考官⽹安装步骤
 - rabbitmq-server 安装包下载链接
 
安装⽅式2:
# 1. 安装erlang
#由于rabbitmq需要erlang语⾔的⽀持,在安装rabbitmq之前需要安装erlang
sudo apt-get install erlang-nox
# 2. 安装Rabbitmq
#更新源
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server 
服务器操作:
# 重启服务器
$ sudo systemctl restart rabbitmq-server
# 启动服务器
$ sudo systemctl start rabbitmq-server
# 关闭服务器
$ sudo systemctl stop rabbitmq-server
# 查看服务器状态
sudo service rabbitmq-server status
# 查看rabbitmq 基本信息
sudo rabbitmqctl status 
3.4 添加admin,并赋予administrator权限
# 添加admin⽤户,密码设置为admin。
sudo rabbitmqctl add_user admin admin 
# 赋予权限
sudo rabbitmqctl set_user_tags admin administrator
# 赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
# 查看⽤户列表
sudo rabbitmqctl list_users
# 删除⽤户
$ sudo rabbitmqctl delete_user admin 
3.5 启动服务器测试
# 安装了Rabbitmq后,默认也安装了该管理⼯具,执⾏命令即可启动
sudo rabbitmq-plugins enable rabbitmq_management(先定位到rabbitmq安装⽬录)
# 浏览器访问
http://localhost:15672/ 
3.6 Python访问RabbitMQ
- RabbitMQ 提供默认的administrator账户
 - ⽤户名和密码:guest、guest
 - 协议:amqp
 - 地址:localhost
 - 端⼝:15672
 - 查看队列中的消息:sudo rabbitctl list_queues
 
# Python3虚拟环境下,安装pika
$ pip install pika
 
# ⽣产者代码:producer.py
import pika# 链接到RabbitMQ服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
# 创建频道
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='zhuozi')
# routing_key是队列名 body是要插⼊的内容
channel.basic_publish(exchange='',routing_key='zhuozi', body=b'Hello RabbitMQ!')
print("开始向 'zhuozi' 队列中发布消息 '汉堡做好啦!'")
# 关闭链接
connection.close()
# 消费者代码:consumer.py
import pika# 链接到rabbitmq服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
# 创建频道,声明消息队列
channel = connection.channel()
# 和⽣产者声明同⼀个队列,如果⼀⽅挂掉,不会丢失数据
channel.queue_declare(queue='zhuozi')# 定义接受消息的回调函数
def callback(channel, method, properties, body):print(body)# 告诉RabbitMQ使⽤callback来接收信息
channel.basic_consume(on_message_callback=callback, queue='zhuozi',auto_ack=True)
# 开始接收信息
channel.start_consuming() 
3.7 RabbitMQ配置远程访问
直接使⽤新建的管理员⽤户访问即可远程访问。
4 Celery 介绍和使用
存在问题:
- 消费者取到消息之后,需要 异步处理。
 - 任务可能出现⾼并发的情况,需要多任务的⽅式执⾏。
 - 耗时任务很多种,每种耗时任务编写的⽣产者和消费者代码有重复。
 - 取到的消息什么时候执⾏,以什么样的⽅式执⾏。
 
结论:
- 实际开发中,我们可以 借助成熟的⼯具Celery 来完成。
 - 有了Celery,我们在使⽤⽣产者消费者模式时,只需要关注任务本身,极⼤的 简化了程序员的开发流程。
 
4.1 Celery介绍
Celery介绍:
- ⼀个 简单、灵活且可靠、处理⼤量消息的分布式系统,可以在⼀台或者多台机器上运⾏。
 - 单个 Celery进程每分钟可处理数以百万计 的任务。
 - 通过消息进⾏通信,使⽤消息队列(broker)在客户端和消费者之间进⾏协调。
 
安装Celery:
$ pip install -U Celery 
Celery官⽅⽂档
4.2 创建Celery实例并加载配置
4.2.1 定义Celery包
# mgproject/mgproject/celery_tasks
在项⽬包⽬录下创建celery_tasks(python package) 
4.2.2 创建Celery实例
在celery_tasks包⽬录下 创建main.py⽂件
# celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo') 
4.2.3 加载Celery配置
在celery_tasks包⽬录下 创建config.py⽂件
# celery_tasks/config.py
# 指定消息队列的位置
broker_url= 'amqp://guest:guest@localhost:5672'
# 修改celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config') 
4.3 定义发送短信任务
在celery_tasks包⽬录下创建 sms(python包)/tasks.py
4.3.1 注册任务
celery_tasks.main.py
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# ⾃动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms']) 
4.3.2 定义任务
celery_tasks.sms.tasks.py
import os
import sys# 添加导包路径
B_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(1, B_DIR)
sys.path.insert(0, os.path.join(B_DIR, 'utils'))
import logging
from celery_tasks.main import celery_app# 为celery使⽤django配置⽂件进⾏设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.dev')
from huyi_sms.sms3 import send_sms_codelogger = logging.getLogger('django')@celery_app.task(name='huyi_send_sms_code')
def huyi_send_sms_code(phone, smscode_str):"""发送短信异步任务:param phone: ⼿机号:param smscode: 短信验证码:return: 成功 code=2 或 失败 smsid=0"""try:# 调⽤外部接⼝执⾏发送短信任务ret = send_sms_code(smscode_str, phone)except Exception as e:logger.error(e)if ret.get('code') != 2:logger.error(e)return ret.get('code', None) 
4.4 启动Celery服务
$ cd ~/Desktop/projects/mangguo/mgproject
$ celery -A celery_tasks.main worker -l info 
- -A指对应的应⽤程序, 其参数是项⽬中 Celery实例的位置。
 - worker指这⾥要启动的worker。
 - -l指⽇志等级,⽐如info等级。
 
4.5 调⽤发送短信任务
# verifications/views.py
from mgproject.celery_tasks.sms.tasks import huyi_send_sms_code# Celery异步发送短信验证码
ret = huyi_send_sms_code.delay(phone, smscode_str)
# 8. 根据外部接⼝返回值响应前端结果
if ret:  # 执⾏⼀个任务就返回⼀个taskid 689e889c-a607-49f3-9777-248a8dcce310return JsonResponse({'code': '200', 'errormsg': 'OK'})
return JsonResponse({'code': '5001', 'errormsg': '发送短信验证码错误'}) 
4.6 补充celery worker的⼯作模式
- 默认是 进程池⽅式,进程数以当前机器的CPU核数为参考,每个CPU开四个进 程。
 - 如何⾃⼰指定进程数:celery -A proj worker --concurrency=4
 - 如何改变进程池⽅式为协程⽅式:
 - celery -A proj worker --concurrency=1000 -P eventlet -c 1000
 
# 安装eventlet模块
$ pip install eventlet
# 启⽤ Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000 


