diff --git a/run.py b/run.py index 72f4c80..3911a54 100644 --- a/run.py +++ b/run.py @@ -8,6 +8,7 @@ import asyncio import random from datetime import datetime +from collections import defaultdict import aiohttp import aiofiles @@ -81,8 +82,15 @@ async def run_jobs(): build_task.state = "running" build_task.started_time = datetime.now() + print(build_task) build_task.save() + await broadcast({ + "target": "build_task", + "id": build_task.id, + "data": model_to_dict(build_task), + }, "build_tasks") + # fake stupid command, whould run CI instead print(f"Starting job for {build_task.repo.name}...") command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log", @@ -103,16 +111,15 @@ async def run_jobs(): build_task.end_time = datetime.now() build_task.save() - await broadcast_to_ws(all_index_ws, { + await broadcast({ "target": "build_task", "id": build_task.id, "data": model_to_dict(build_task), - }) - - await asyncio.sleep(3) + }, "build_tasks") -async def broadcast_to_ws(ws_list, message): +async def broadcast(message, channel): + ws_list = subscriptions[channel] dead_ws = [] for ws in ws_list: @@ -125,9 +132,14 @@ async def broadcast_to_ws(ws_list, message): ws_list.remove(to_remove) +def subscribe(ws, channel): + subscriptions[channel].append(ws) + + @app.websocket('/index-ws') async def index_ws(request, websocket): - all_index_ws.append(websocket) + subscribe(websocket, "build_tasks") + while True: data = await websocket.recv() print(f"websocket: {data}") @@ -146,6 +158,7 @@ async def index(request): if __name__ == "__main__": - all_index_ws = [] + subscriptions = defaultdict(list) + app.add_task(initialize_app_list()) app.run('localhost', port=5000, debug=True)