From fdaa462a84aaabc00bbbb84a15ba718699574a1c Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Mon, 8 Apr 2019 18:14:23 +0200 Subject: [PATCH] [fix] ensure that websockets are unsubscribes on closed connection --- run.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/run.py b/run.py index fb8dd6d..8a491c1 100644 --- a/run.py +++ b/run.py @@ -504,7 +504,30 @@ def subscribe(ws, channel): subscriptions[channel].append(ws) +def unsubscribe_all(ws): + for channel in subscriptions: + if ws in subscriptions[channel]: + if ws in subscriptions[channel]: + print(f"\033[1;36mUnsubscribe ws {ws} from {channel}\033[0m") + subscriptions[channel].remove(ws) + + +def clean_websocket(function): + @wraps(function) + async def _wrap(request, websocket, *args, **kwargs): + try: + to_return = await function(request, websocket, *args, **kwargs) + return to_return + except Exception: + print(function.__name__) + unsubscribe_all(websocket) + raise + + return _wrap + + @app.websocket('/index-ws') +@clean_websocket async def ws_index(request, websocket): subscribe(websocket, "jobs") @@ -538,6 +561,7 @@ async def ws_index(request, websocket): @app.websocket('/job--ws') +@clean_websocket async def ws_job(request, websocket, job_id): job = Job.select().where(Job.id == job_id) @@ -557,6 +581,7 @@ async def ws_job(request, websocket, job_id): @app.websocket('/apps-ws') +@clean_websocket async def ws_apps(request, websocket): subscribe(websocket, "jobs") subscribe(websocket, "apps") @@ -653,6 +678,7 @@ async def ws_apps(request, websocket): @app.websocket('/app--ws') +@clean_websocket async def ws_app(request, websocket, app_name): # XXX I don't check if the app exists because this websocket is supposed to # be only loaded from the app page which does this job already