From f87fcb560f7b7a6afc2935a9ba979c0474876562 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Sun, 15 Jul 2018 09:47:39 +0200 Subject: [PATCH] [enh] can cancel job --- models.py | 1 + run.py | 60 +++++++++++++++++++++++++++++++++++++++++++++- templates/job.html | 8 +++++++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/models.py b/models.py index 4b1fcaa..87c4421 100644 --- a/models.py +++ b/models.py @@ -24,6 +24,7 @@ class Job(peewee.Model): ('runnning', 'Running'), ('done', 'Done'), ('failure', 'Failure'), + ('canceled', 'Canceled'), ), default="scheduled") log = peewee.TextField(default="") diff --git a/run.py b/run.py index 740041f..98e80e5 100644 --- a/run.py +++ b/run.py @@ -32,6 +32,12 @@ APPS_LIST = [OFFICAL_APPS_LIST, COMMUNITY_APPS_LIST] subscriptions = defaultdict(list) +# this will have the form: +# jobs_in_memory_state = { +# some_job_id: {"worker": some_worker_id, "task": some_aio_task}, +# } +jobs_in_memory_state = {} + def reset_pending_jobs(): Job.update(state="scheduled").where(Job.state == "running").execute() @@ -118,7 +124,10 @@ async def jobs_dispatcher(): worker.state = "busy" worker.save() - asyncio.ensure_future(run_job(worker, job)) + jobs_in_memory_state[job.id] = { + "worker": worker.id, + "task": asyncio.ensure_future(run_job(worker, job)), + } async def run_job(worker, job): @@ -159,6 +168,9 @@ async def run_job(worker, job): job.state = "done" job.save() + # remove ourself from the state + del jobs_in_memory_state[job.id] + worker.state = "available" worker.save() @@ -258,6 +270,52 @@ async def api_new_job(request): return response.text("ok") +@app.route("/api/job//stop", methods=['POST']) +async def api_stop_job(request, job_id): + # TODO auth or some kind + + job = Job.select().where(Job.id == job_id) + + if job.count == 0: + raise NotFound() + + job = job[0] + + if job.state == "scheduled": + job.state = "canceled" + job.save() + + await broadcast({ + "action": "update_job", + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}"]) + + return response.text("ok") + + if job.state == "running": + job.state = "canceled" + job.save() + + jobs_in_memory_state[job.id]["task"].cancel() + + worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0] + worker.state = "available" + worker.save() + + await broadcast({ + "action": "update_job", + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}"]) + + return response.text("ok") + + if job.state in ("done", "canceled", "failure"): + # nothing to do, task is already done + return response.text("ok") + + raise Exception(f"Tryed to cancel a job with an unknown state: {job.state}") + + @app.route('/job/') async def job(request, job_id): job = Job.select().where(Job.id == job_id) diff --git a/templates/job.html b/templates/job.html index 2b5eda2..4883add 100644 --- a/templates/job.html +++ b/templates/job.html @@ -7,6 +7,9 @@ + @@ -14,6 +17,8 @@

Job '{{job.name}}'

+ + @@ -37,6 +42,9 @@ methods: { timestampToDate: function (timestamp) { return new Date(timestamp * 1000).toLocaleString() + }, + cancelJob: function() { + $.post("/api/job/" + this.job.id + "/stop") } } })
State{{job.state}}
Target revision{{job.target_revision}}