mirror of
https://github.com/YunoHost/yunorunner.git
synced 2024-09-03 20:05:52 +02:00
[enh] move to a broadcast system for messages
This commit is contained in:
parent
20fc770db2
commit
08f94e45d1
1 changed files with 20 additions and 7 deletions
27
run.py
27
run.py
|
@ -8,6 +8,7 @@ import asyncio
|
||||||
import random
|
import random
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import aiofiles
|
import aiofiles
|
||||||
|
@ -81,8 +82,15 @@ async def run_jobs():
|
||||||
|
|
||||||
build_task.state = "running"
|
build_task.state = "running"
|
||||||
build_task.started_time = datetime.now()
|
build_task.started_time = datetime.now()
|
||||||
|
print(build_task)
|
||||||
build_task.save()
|
build_task.save()
|
||||||
|
|
||||||
|
await broadcast({
|
||||||
|
"target": "build_task",
|
||||||
|
"id": build_task.id,
|
||||||
|
"data": model_to_dict(build_task),
|
||||||
|
}, "build_tasks")
|
||||||
|
|
||||||
# fake stupid command, whould run CI instead
|
# fake stupid command, whould run CI instead
|
||||||
print(f"Starting job for {build_task.repo.name}...")
|
print(f"Starting job for {build_task.repo.name}...")
|
||||||
command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log",
|
command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log",
|
||||||
|
@ -103,16 +111,15 @@ async def run_jobs():
|
||||||
build_task.end_time = datetime.now()
|
build_task.end_time = datetime.now()
|
||||||
build_task.save()
|
build_task.save()
|
||||||
|
|
||||||
await broadcast_to_ws(all_index_ws, {
|
await broadcast({
|
||||||
"target": "build_task",
|
"target": "build_task",
|
||||||
"id": build_task.id,
|
"id": build_task.id,
|
||||||
"data": model_to_dict(build_task),
|
"data": model_to_dict(build_task),
|
||||||
})
|
}, "build_tasks")
|
||||||
|
|
||||||
await asyncio.sleep(3)
|
|
||||||
|
|
||||||
|
|
||||||
async def broadcast_to_ws(ws_list, message):
|
async def broadcast(message, channel):
|
||||||
|
ws_list = subscriptions[channel]
|
||||||
dead_ws = []
|
dead_ws = []
|
||||||
|
|
||||||
for ws in ws_list:
|
for ws in ws_list:
|
||||||
|
@ -125,9 +132,14 @@ async def broadcast_to_ws(ws_list, message):
|
||||||
ws_list.remove(to_remove)
|
ws_list.remove(to_remove)
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe(ws, channel):
|
||||||
|
subscriptions[channel].append(ws)
|
||||||
|
|
||||||
|
|
||||||
@app.websocket('/index-ws')
|
@app.websocket('/index-ws')
|
||||||
async def index_ws(request, websocket):
|
async def index_ws(request, websocket):
|
||||||
all_index_ws.append(websocket)
|
subscribe(websocket, "build_tasks")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data = await websocket.recv()
|
data = await websocket.recv()
|
||||||
print(f"websocket: {data}")
|
print(f"websocket: {data}")
|
||||||
|
@ -146,6 +158,7 @@ async def index(request):
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
all_index_ws = []
|
subscriptions = defaultdict(list)
|
||||||
|
|
||||||
app.add_task(initialize_app_list())
|
app.add_task(initialize_app_list())
|
||||||
app.run('localhost', port=5000, debug=True)
|
app.run('localhost', port=5000, debug=True)
|
||||||
|
|
Loading…
Add table
Reference in a new issue