From 5b51fbfef6ec15b4ab45b89eb006d0217cb32f5a Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Sun, 15 Jul 2018 03:40:23 +0200 Subject: [PATCH] [enh] boardcasts logs --- models.py | 2 ++ run.py | 39 ++++++++++++++++++++++++++++----------- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/models.py b/models.py index f0cbf8c..4b1fcaa 100644 --- a/models.py +++ b/models.py @@ -26,6 +26,8 @@ class Job(peewee.Model): ('failure', 'Failure'), ), default="scheduled") + log = peewee.TextField(default="") + created_time = peewee.DateTimeField(constraints=[peewee.SQL("DEFAULT (datetime('now'))")]) started_time = peewee.DateTimeField(null=True) end_time = peewee.DateTimeField(null=True) diff --git a/run.py b/run.py index fd9c409..4996072 100644 --- a/run.py +++ b/run.py @@ -115,9 +115,9 @@ async def jobs_dispatcher(): async def run_job(worker, job): await broadcast({ - "action": "job", + "action": "update_job", "data": model_to_dict(job), - }, "jobs") + }, ["jobs", f"job-{job.id}"]) # fake stupid command, whould run CI instead print(f"Starting job {job.name}...") @@ -128,6 +128,19 @@ async def run_job(worker, job): while not command.stdout.at_eof(): data = await command.stdout.readline() line = data.decode().rstrip() + + job.log += data.decode() + # XXX seems to be okay performance wise but that's probably going to be + # a bottleneck at some point :/ + # theoritically jobs are going to have slow output + job.save() + + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}"]) + print(f">> {line}") # XXX stupid crap to stimulate long jobs @@ -147,18 +160,22 @@ async def run_job(worker, job): "action": "update_job", "id": job.id, "data": model_to_dict(job), - }, "jobs") + }, ["jobs", f"job-{job.id}"]) -async def broadcast(message, channel): - ws_list = subscriptions[channel] - dead_ws = [] +async def broadcast(message, channels): + if not isinstance(channels, (list, tuple)): + channels = [channels] - for ws in ws_list: - try: - await ws.send(ujson.dumps(message)) - except ConnectionClosed: - dead_ws.append(ws) + for channel in channels: + ws_list = subscriptions[channel] + dead_ws = [] + + for ws in ws_list: + try: + await ws.send(ujson.dumps(message)) + except ConnectionClosed: + dead_ws.append(ws) for to_remove in dead_ws: ws_list.remove(to_remove)