[enh] don't fail on job error and display job as error

This commit is contained in:
Laurent Peuch 2018-11-03 05:24:44 +01:00
parent f6cd2bce54
commit 97a4d2c443
2 changed files with 38 additions and 24 deletions

View file

@ -29,6 +29,7 @@ class Job(peewee.Model):
('runnning', 'Running'),
('done', 'Done'),
('failure', 'Failure'),
('error', 'Error'),
('canceled', 'Canceled'),
), default="scheduled")

61
run.py
View file

@ -6,6 +6,7 @@ import argh
import random
import logging
import asyncio
import traceback
import itertools
from datetime import datetime, date
@ -314,34 +315,46 @@ async def run_job(worker, job):
cwd = os.path.split(path_to_analyseCI)[0]
arguments = f' {job.url_or_path} "{job.name}"'
task_logger.info(f"Launch command: /bin/bash " + path_to_analyseCI + arguments)
command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments,
cwd=cwd,
# default limit is not enough in some situations
limit=(2 ** 16) ** 10,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
try:
command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments,
cwd=cwd,
# default limit is not enough in some situations
limit=(2 ** 16) ** 10,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
while not command.stdout.at_eof():
data = await command.stdout.readline()
while not command.stdout.at_eof():
data = await command.stdout.readline()
job.log += data.decode()
# XXX seems to be okay performance wise but that's probably going to be
# a bottleneck at some point :/
# theoritically jobs are going to have slow output
job.log += data.decode()
# XXX seems to be okay performance wise but that's probably going to be
# a bottleneck at some point :/
# theoritically jobs are going to have slow output
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}"])
except Exception as e:
traceback.print_exc()
task_logger.exception(f"ERROR in job '{job.name} #{job.id}'")
job.end_time = datetime.now()
job.state = "error"
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}"])
# XXX add mechanism to reschedule error jobs
task_logger.info(f"Finished job '{job.name}'")
else:
task_logger.info(f"Finished job '{job.name}'")
await command.wait()
job.end_time = datetime.now()
job.state = "done" if command.returncode == 0 else "failure"
job.save()
await command.wait()
job.end_time = datetime.now()
job.state = "done" if command.returncode == 0 else "failure"
job.save()
# remove ourself from the state
del jobs_in_memory_state[job.id]
@ -384,7 +397,7 @@ async def ws_index(request, websocket):
JobAlias = Job.alias()
subquery = JobAlias.select()\
.where(JobAlias.state << ("done", "failure", "canceled"))\
.where(JobAlias.state << ("done", "failure", "canceled", "error"))\
.group_by(JobAlias.url_or_path)\
.select(fn.Max(JobAlias.id).alias("max_id"))
@ -558,7 +571,7 @@ async def api_stop_job(request, job_id):
return response.text("ok")
if job.state in ("done", "canceled", "failure"):
if job.state in ("done", "canceled", "failure", "error"):
api_logger.info(f"Request to cancel job '{job.name}' [job.id] but job is already in '{job.state}' state, do nothing")
# nothing to do, task is already done
return response.text("ok")