From 97a4d2c44372a7ea626dbc9f9f1fc90fde9faaff Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Sat, 3 Nov 2018 05:24:44 +0100 Subject: [PATCH] [enh] don't fail on job error and display job as error --- models.py | 1 + run.py | 61 +++++++++++++++++++++++++++++++++---------------------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/models.py b/models.py index cdaaca0..39c13ee 100644 --- a/models.py +++ b/models.py @@ -29,6 +29,7 @@ class Job(peewee.Model): ('runnning', 'Running'), ('done', 'Done'), ('failure', 'Failure'), + ('error', 'Error'), ('canceled', 'Canceled'), ), default="scheduled") diff --git a/run.py b/run.py index 0551966..ed7d408 100644 --- a/run.py +++ b/run.py @@ -6,6 +6,7 @@ import argh import random import logging import asyncio +import traceback import itertools from datetime import datetime, date @@ -314,34 +315,46 @@ async def run_job(worker, job): cwd = os.path.split(path_to_analyseCI)[0] arguments = f' {job.url_or_path} "{job.name}"' task_logger.info(f"Launch command: /bin/bash " + path_to_analyseCI + arguments) - command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments, - cwd=cwd, - # default limit is not enough in some situations - limit=(2 ** 16) ** 10, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) + try: + command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments, + cwd=cwd, + # default limit is not enough in some situations + limit=(2 ** 16) ** 10, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) - while not command.stdout.at_eof(): - data = await command.stdout.readline() + while not command.stdout.at_eof(): + data = await command.stdout.readline() - job.log += data.decode() - # XXX seems to be okay performance wise but that's probably going to be - # a bottleneck at some point :/ - # theoritically jobs are going to have slow output + job.log += data.decode() + # XXX seems to be okay performance wise but that's probably going to be + # a bottleneck at some point :/ + # theoritically jobs are going to have slow output + job.save() + + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}"]) + + except Exception as e: + traceback.print_exc() + task_logger.exception(f"ERROR in job '{job.name} #{job.id}'") + + job.end_time = datetime.now() + job.state = "error" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}"]) + # XXX add mechanism to reschedule error jobs - task_logger.info(f"Finished job '{job.name}'") + else: + task_logger.info(f"Finished job '{job.name}'") - await command.wait() - job.end_time = datetime.now() - job.state = "done" if command.returncode == 0 else "failure" - job.save() + await command.wait() + job.end_time = datetime.now() + job.state = "done" if command.returncode == 0 else "failure" + job.save() # remove ourself from the state del jobs_in_memory_state[job.id] @@ -384,7 +397,7 @@ async def ws_index(request, websocket): JobAlias = Job.alias() subquery = JobAlias.select()\ - .where(JobAlias.state << ("done", "failure", "canceled"))\ + .where(JobAlias.state << ("done", "failure", "canceled", "error"))\ .group_by(JobAlias.url_or_path)\ .select(fn.Max(JobAlias.id).alias("max_id")) @@ -558,7 +571,7 @@ async def api_stop_job(request, job_id): return response.text("ok") - if job.state in ("done", "canceled", "failure"): + if job.state in ("done", "canceled", "failure", "error"): api_logger.info(f"Request to cancel job '{job.name}' [job.id] but job is already in '{job.state}' state, do nothing") # nothing to do, task is already done return response.text("ok")