Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения Fastapi ⇐ Python
Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения Fastapi
У меня есть такой менеджер соединений WebSocket:
класс ConnectionManager: def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json(сообщение и маршрут WebSocket:
@router.websocket("/ws/{token}") async def ws(websocket: WebSocket, token: str, redis: Annotated [Redis, Depends(get_redis)]): user_id = redis.get(токен) если user_id: redis.expire(user_id) еще: поднять redis_error пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) Я хочу сохранить соединения пользователей, и когда придет сообщение Redis pubsub, обработать это сообщение, а затем отправить сообщение WebSocket некоторым пользователям. модуль, обрабатывающий сообщение, не является частью приложения Fastapi.
Я пытался реализовать это внутри приложения Fastapi, реализовав threading и asyncio, но они прервали работу Fastapi само приложение.
как я могу инициировать отправку сообщения объектов WebSocket за пределами приложения Fastapi?
Что я пробовал:
redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") @router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) но я получаю сообщение об ошибке pubsub от Redis и говорит, что я еще не подписался, если не сделаю это следующим образом:
@router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id)
но при этом создается соединение Redis для каждого пользователя, подключающегося к веб-сокету, можно ли как-нибудь глобально определить соединение Redis для всех пользователей?
У меня есть такой менеджер соединений WebSocket:
класс ConnectionManager: def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json(сообщение и маршрут WebSocket:
@router.websocket("/ws/{token}") async def ws(websocket: WebSocket, token: str, redis: Annotated [Redis, Depends(get_redis)]): user_id = redis.get(токен) если user_id: redis.expire(user_id) еще: поднять redis_error пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) Я хочу сохранить соединения пользователей, и когда придет сообщение Redis pubsub, обработать это сообщение, а затем отправить сообщение WebSocket некоторым пользователям. модуль, обрабатывающий сообщение, не является частью приложения Fastapi.
Я пытался реализовать это внутри приложения Fastapi, реализовав threading и asyncio, но они прервали работу Fastapi само приложение.
как я могу инициировать отправку сообщения объектов WebSocket за пределами приложения Fastapi?
Что я пробовал:
redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") @router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) но я получаю сообщение об ошибке pubsub от Redis и говорит, что я еще не подписался, если не сделаю это следующим образом:
@router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id)
но при этом создается соединение Redis для каждого пользователя, подключающегося к веб-сокету, можно ли как-нибудь глобально определить соединение Redis для всех пользователей?
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Проблема с получением сообщения через веб-сокет (невозможно прослушивать)
Гость » » в форуме Android - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Гость
-