From 6b70f46789e4f995ddf1b23c2ce7860fa3574882 Mon Sep 17 00:00:00 2001 From: Kay0u Date: Tue, 12 Oct 2021 18:06:59 +0200 Subject: [PATCH] handle multi worker --- run.py | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/run.py b/run.py index 8b41d32..622ab1f 100644 --- a/run.py +++ b/run.py @@ -351,11 +351,41 @@ async def launch_monthly_job(): await create_job(repo.name, repo.url) +async def ensure_workers_count(): + if Worker.select().count() < app.config.WORKER_COUNT: + for _ in range(app.config.WORKER_COUNT - Worker.select().count()): + Worker.create(state="available") + elif Worker.select().count() > app.config.WORKER_COUNT: + workers_to_remove = Worker.select().count() - app.config.WORKER_COUNT + workers = Worker.select().where(Worker.state == "available") + for worker in workers: + if workers_to_remove == 0: + break + worker.delete_instance() + workers_to_remove -= 1 + + jobs_to_stop = workers_to_remove + for job_id in jobs_in_memory_state: + if jobs_to_stop == 0: + break + await stop_job(job_id) + jobs_to_stop -= 1 + job = Job.select().where(Job.id == job_id)[0] + job.state = "scheduled" + job.log = "" + job.save() + + workers = Worker.select().where(Worker.state == "available") + for worker in workers: + if workers_to_remove == 0: + break + worker.delete_instance() + workers_to_remove -= 1 + + @always_relaunch(sleep=3) async def jobs_dispatcher(): - if Worker.select().count() == 0: - for i in range(1): - Worker.create(state="available") + await ensure_workers_count() workers = Worker.select().where(Worker.state == "available") @@ -400,7 +430,7 @@ async def run_job(worker, job): task_logger.info(f"Starting job '{job.name}' #{job.id}...") cwd = os.path.split(path_to_analyseCI)[0] - arguments = f' {job.url_or_path} "{job.name}" {job.id}' + arguments = f' {job.url_or_path} "{job.name}" {job.id} {worker.id}' task_logger.info(f"Launch command: /bin/bash " + path_to_analyseCI + arguments) try: command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments, @@ -1173,6 +1203,7 @@ def main(config="./config.py"): "MONITOR_GIT": False, "MONITOR_ONLY_GOOD_QUALITY_APPS": False, "MONTHLY_JOBS": False, + "WORKER_COUNT": 1, } app.config.update_config(default_config)