当前位置: 首页 > news >正文

荆州房地产网站建设怀远网站建设

荆州房地产网站建设,怀远网站建设,免费给我推广,南宁 网站建设 制作最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题 1.问题难点 在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…

最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题

1.问题难点

在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。

2.解决方案

使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。

3.方案设计

  • 每个进程启动的时候都进行一个消息的订阅。
  • 通过http请求,进行消息发布。
  • 每个进程收到发布的消息后,进行判断是否由自己进行处理。

4.代码实现

①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。

# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen():  # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")

②websocket处理类

from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]

③websocket连接

from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]

④http请求进行消息发布

@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}

5.源码

github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat

6.参考文章

https://www.cnblogs.com/a00ium/p/16931133.html

http://www.yayakq.cn/news/599185/

相关文章:

  • 中国做的比较好的网站设计公司有哪些两学一做专题网站用途
  • 定南建设银行网站点福永营销型网站多少钱
  • 深圳市建设工程交易中心网站北京王府井步行街上来往最多的是什么人
  • 网站开发什么语言好工作服厂家无锡 帛裳服饰专业
  • 公司网站排名中国水利建设网站
  • 常州网站建设公司信息杭州网站建设方案推广
  • ie8打不开建设银行网站桥的设计网站建设
  • 群艺馆网站建设方案拼多多seo怎么优化
  • 一站式做网站哪家好wordpress发送邮件功能未启用
  • 做一个回收网站怎么做如何新建一个网站
  • 旅游网络网站建设方案网站的优化哪个好
  • 建设网站 织梦市场调研报告word模板
  • 苏州网站设计选哪家南宁手机企业网站定制公司
  • 网站的管理权限有什么用衡阳县住房和城乡建设局网站
  • 百度怎么做开锁网站亳州市网站建设
  • 自己怎么做公司网站wordpress插件采集
  • 18岁以上站长统计男男做的视频网站
  • 网站推广费用一般多少钱国内工程机械行业网站建设现状
  • 小米路由器做网站网站后台编辑器不能正常显示
  • 广州外贸网站制作百度关键词推广方案
  • 新建网址网站编辑 seo
  • 谷城网站建设网页制作费用明细
  • 网站解析慢 优化网站友情链接查询
  • php网站开发技术背景网站开发包括网站设计
  • 食品电子商务网站建设方案windows系统怎么做ppt下载网站
  • 济南网站建设直播网站语言那种好
  • 企业做网站的crm是什么意思啊
  • 邢台企业网站建设咨询网站建设百度知道
  • 温州市建设小学学校网站通辽做网站建设
  • 漯河企业网站建设网站备案服务码口令是什么意思