diff --git a/run.py b/run.py index 403a3db..2f666a8 100644 --- a/run.py +++ b/run.py @@ -343,42 +343,41 @@ async def launch_monthly_job(type): await create_job(repo.name, repo.app_list, repo, job_command_last_part) +@always_relaunch(sleep=3) async def jobs_dispatcher(): if Worker.select().count() == 0: for i in range(1): Worker.create(state="available") - while True: - workers = Worker.select().where(Worker.state == "available") + workers = Worker.select().where(Worker.state == "available") - # no available workers, wait - if workers.count() == 0: + # no available workers, wait + if workers.count() == 0: + return + + with db.atomic('IMMEDIATE'): + jobs = Job.select().where(Job.state == "scheduled") + + # no jobs to process, wait + if jobs.count() == 0: await asyncio.sleep(3) - continue + return - with db.atomic('IMMEDIATE'): - jobs = Job.select().where(Job.state == "scheduled") + for i in range(min(workers.count(), jobs.count())): + job = jobs[i] + worker = workers[i] - # no jobs to process, wait - if jobs.count() == 0: - await asyncio.sleep(3) - continue + job.state = "running" + job.started_time = datetime.now() + job.save() - for i in range(min(workers.count(), jobs.count())): - job = jobs[i] - worker = workers[i] + worker.state = "busy" + worker.save() - job.state = "running" - job.started_time = datetime.now() - job.save() - - worker.state = "busy" - worker.save() - - jobs_in_memory_state[job.id] = { - "worker": worker.id, - "task": asyncio.ensure_future(run_job(worker, job)), - } + jobs_in_memory_state[job.id] = { + "worker": worker.id, + "task": asyncio.ensure_future(run_job(worker, job)), + } async def run_job(worker, job):