41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
from typing import Dict
|
|
|
|
from fastapi import WebSocket
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: Dict[str, list[WebSocket]] = {}
|
|
|
|
async def connect(self, websocket: WebSocket, id: str):
|
|
print("connecting...", id)
|
|
if id not in self.active_connections:
|
|
self.active_connections[id] = []
|
|
await websocket.accept()
|
|
self.active_connections[id].append(websocket)
|
|
|
|
def disconnect(self, websocket: WebSocket, id: str):
|
|
if id not in self.active_connections:
|
|
return
|
|
self.active_connections[id].remove(websocket)
|
|
|
|
async def send_personal_message(self, message: str, websocket: WebSocket):
|
|
await websocket.send_text(message)
|
|
|
|
async def broadcast(self, message: str, id: str):
|
|
if id not in self.active_connections:
|
|
return
|
|
print(f"i go this message {message}")
|
|
print(
|
|
f"I am going to send it to {len(self.active_connections[id])} connections"
|
|
)
|
|
for connection in self.active_connections[id]:
|
|
print("sending it to ", connection)
|
|
try:
|
|
await connection.send_text(message)
|
|
except Exception:
|
|
self.disconnect(connection, id)
|
|
print("sent it to ", connection)
|
|
|
|
|
|
manager = ConnectionManager()
|