diff --git a/models.py b/models.py index 0d5c023..9bd88b7 100644 --- a/models.py +++ b/models.py @@ -33,8 +33,15 @@ class BuildTask(peewee.Model): database = db +class Worker(peewee.Model): + state = peewee.CharField(choices=( + ('available', 'Available'), + ('busy', 'Busy'), + )) + + # peewee is a bit stupid and will crash if the table already exists -for i in [Repo, BuildTask]: +for i in [Repo, BuildTask, Worker]: try: i.create_table() except: diff --git a/run.py b/run.py index 7babb3f..fdee322 100644 --- a/run.py +++ b/run.py @@ -19,7 +19,7 @@ from sanic import Sanic, response from playhouse.shortcuts import model_to_dict, dict_to_model -from models import Repo, BuildTask, db +from models import Repo, BuildTask, db, Worker app = Sanic() @@ -65,58 +65,79 @@ async def initialize_app_list(): state="scheduled", ) - asyncio.ensure_future(run_jobs()) +async def tasks_dispatcher(): + if Worker.select().count() == 0: + for i in range(5): + Worker.create(state="available") -async def run_jobs(): - print("Run jobs...") while True: - with db.atomic(): - build_task = BuildTask.select().where(BuildTask.state == "scheduled").limit(1) + workers = Worker.select().where(Worker.state == "available") - if not build_task.count(): + # no available workers, wait + if workers.count() == 0: + await asyncio.sleep(3) + continue + + with db.atomic('IMMEDIATE'): + build_tasks = BuildTask.select().where(BuildTask.state == "scheduled") + + # no task to process, wait + if build_tasks.count() == 0: await asyncio.sleep(3) continue - build_task = build_task[0] + for i in range(min(workers.count(), build_tasks.count())): + build_task = build_tasks[i] + worker = workers[i] - build_task.state = "running" - build_task.started_time = datetime.now() - print(build_task) - build_task.save() + build_task.state = "running" + build_task.started_time = datetime.now() + print(build_task) + build_task.save() - await broadcast({ - "target": "build_task", - "id": build_task.id, - "data": model_to_dict(build_task), - }, "build_tasks") + worker.state = "busy" + worker.save() - # fake stupid command, whould run CI instead - print(f"Starting job for {build_task.repo.name}...") - command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) + asyncio.ensure_future(run_task(worker, build_task)) - while not command.stdout.at_eof(): - data = await command.stdout.readline() - line = data.decode().rstrip() - print(f">> {line}") - # XXX stupid crap to stimulate long jobs - # await asyncio.sleep(random.randint(30, 120)) - # await asyncio.sleep(5) - print(f"Finished task for {build_task.repo.name}") +async def run_task(worker, build_task): + await broadcast({ + "target": "build_task", + "id": build_task.id, + "data": model_to_dict(build_task), + }, "build_tasks") - await command.wait() - build_task.end_time = datetime.now() - build_task.state = "done" - build_task.save() + # fake stupid command, whould run CI instead + print(f"Starting job for {build_task.repo.name}...") + command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) - await broadcast({ - "target": "build_task", - "id": build_task.id, - "data": model_to_dict(build_task), - }, "build_tasks") + while not command.stdout.at_eof(): + data = await command.stdout.readline() + line = data.decode().rstrip() + print(f">> {line}") + + # XXX stupid crap to stimulate long jobs + await asyncio.sleep(random.randint(1, 15)) + # await asyncio.sleep(5) + print(f"Finished task for {build_task.repo.name}") + + await command.wait() + build_task.end_time = datetime.now() + build_task.state = "done" + build_task.save() + + worker.state = "available" + worker.save() + + await broadcast({ + "target": "build_task", + "id": build_task.id, + "data": model_to_dict(build_task), + }, "build_tasks") async def broadcast(message, channel): @@ -162,4 +183,5 @@ if __name__ == "__main__": subscriptions = defaultdict(list) app.add_task(initialize_app_list()) + app.add_task(tasks_dispatcher()) app.run('localhost', port=5000, debug=True)