[enh] implement workers, move to a dispatcher pattern

This commit is contained in:
Laurent Peuch 2018-07-12 05:23:41 +02:00
parent b5ff6e241b
commit a6ee91c944
2 changed files with 69 additions and 40 deletions

View file

@ -33,8 +33,15 @@ class BuildTask(peewee.Model):
database = db
class Worker(peewee.Model):
state = peewee.CharField(choices=(
('available', 'Available'),
('busy', 'Busy'),
))
# peewee is a bit stupid and will crash if the table already exists
for i in [Repo, BuildTask]:
for i in [Repo, BuildTask, Worker]:
try:
i.create_table()
except:

100
run.py
View file

@ -19,7 +19,7 @@ from sanic import Sanic, response
from playhouse.shortcuts import model_to_dict, dict_to_model
from models import Repo, BuildTask, db
from models import Repo, BuildTask, db, Worker
app = Sanic()
@ -65,58 +65,79 @@ async def initialize_app_list():
state="scheduled",
)
asyncio.ensure_future(run_jobs())
async def tasks_dispatcher():
if Worker.select().count() == 0:
for i in range(5):
Worker.create(state="available")
async def run_jobs():
print("Run jobs...")
while True:
with db.atomic():
build_task = BuildTask.select().where(BuildTask.state == "scheduled").limit(1)
workers = Worker.select().where(Worker.state == "available")
if not build_task.count():
# no available workers, wait
if workers.count() == 0:
await asyncio.sleep(3)
continue
with db.atomic('IMMEDIATE'):
build_tasks = BuildTask.select().where(BuildTask.state == "scheduled")
# no task to process, wait
if build_tasks.count() == 0:
await asyncio.sleep(3)
continue
build_task = build_task[0]
for i in range(min(workers.count(), build_tasks.count())):
build_task = build_tasks[i]
worker = workers[i]
build_task.state = "running"
build_task.started_time = datetime.now()
print(build_task)
build_task.save()
build_task.state = "running"
build_task.started_time = datetime.now()
print(build_task)
build_task.save()
await broadcast({
"target": "build_task",
"id": build_task.id,
"data": model_to_dict(build_task),
}, "build_tasks")
worker.state = "busy"
worker.save()
# fake stupid command, whould run CI instead
print(f"Starting job for {build_task.repo.name}...")
command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
asyncio.ensure_future(run_task(worker, build_task))
while not command.stdout.at_eof():
data = await command.stdout.readline()
line = data.decode().rstrip()
print(f">> {line}")
# XXX stupid crap to stimulate long jobs
# await asyncio.sleep(random.randint(30, 120))
# await asyncio.sleep(5)
print(f"Finished task for {build_task.repo.name}")
async def run_task(worker, build_task):
await broadcast({
"target": "build_task",
"id": build_task.id,
"data": model_to_dict(build_task),
}, "build_tasks")
await command.wait()
build_task.end_time = datetime.now()
build_task.state = "done"
build_task.save()
# fake stupid command, whould run CI instead
print(f"Starting job for {build_task.repo.name}...")
command = await asyncio.create_subprocess_shell("/usr/bin/tail /var/log/auth.log",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
await broadcast({
"target": "build_task",
"id": build_task.id,
"data": model_to_dict(build_task),
}, "build_tasks")
while not command.stdout.at_eof():
data = await command.stdout.readline()
line = data.decode().rstrip()
print(f">> {line}")
# XXX stupid crap to stimulate long jobs
await asyncio.sleep(random.randint(1, 15))
# await asyncio.sleep(5)
print(f"Finished task for {build_task.repo.name}")
await command.wait()
build_task.end_time = datetime.now()
build_task.state = "done"
build_task.save()
worker.state = "available"
worker.save()
await broadcast({
"target": "build_task",
"id": build_task.id,
"data": model_to_dict(build_task),
}, "build_tasks")
async def broadcast(message, channel):
@ -162,4 +183,5 @@ if __name__ == "__main__":
subscriptions = defaultdict(list)
app.add_task(initialize_app_list())
app.add_task(tasks_dispatcher())
app.run('localhost', port=5000, debug=True)