Merge pull request #30 from YunoHost/handle-multi-worker

handle multi worker
This commit is contained in:
Alexandre Aubin 2021-10-19 22:55:26 +02:00 committed by GitHub
commit 266bad3ec0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

122
run.py
View file

@ -82,10 +82,20 @@ LOGGING_CONFIG_DEFAULTS["formatters"] = {
},
}
def datetime_to_epoch_json_converter(o):
if isinstance(o, datetime):
return o.strftime('%s')
# define a custom json dumps to convert datetime
def my_json_dumps(o):
return json.dumps(o, default=datetime_to_epoch_json_converter)
task_logger = logging.getLogger("task")
api_logger = logging.getLogger("api")
app = Sanic(__name__)
app = Sanic(__name__, dumps=my_json_dumps)
app.static('/static', './static/')
loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)) + '/templates', encoding='utf8')
@ -110,11 +120,6 @@ subscriptions = defaultdict(list)
jobs_in_memory_state = {}
def datetime_to_epoch_json_converter(o):
if isinstance(o, datetime):
return o.strftime('%s')
async def wait_closed(self):
"""
Wait until the connection is closed.
@ -351,11 +356,41 @@ async def launch_monthly_job():
await create_job(repo.name, repo.url)
async def ensure_workers_count():
if Worker.select().count() < app.config.WORKER_COUNT:
for _ in range(app.config.WORKER_COUNT - Worker.select().count()):
Worker.create(state="available")
elif Worker.select().count() > app.config.WORKER_COUNT:
workers_to_remove = Worker.select().count() - app.config.WORKER_COUNT
workers = Worker.select().where(Worker.state == "available")
for worker in workers:
if workers_to_remove == 0:
break
worker.delete_instance()
workers_to_remove -= 1
jobs_to_stop = workers_to_remove
for job_id in jobs_in_memory_state:
if jobs_to_stop == 0:
break
await stop_job(job_id)
jobs_to_stop -= 1
job = Job.select().where(Job.id == job_id)[0]
job.state = "scheduled"
job.log = ""
job.save()
workers = Worker.select().where(Worker.state == "available")
for worker in workers:
if workers_to_remove == 0:
break
worker.delete_instance()
workers_to_remove -= 1
@always_relaunch(sleep=3)
async def jobs_dispatcher():
if Worker.select().count() == 0:
for i in range(1):
Worker.create(state="available")
await ensure_workers_count()
workers = Worker.select().where(Worker.state == "available")
@ -400,7 +435,7 @@ async def run_job(worker, job):
task_logger.info(f"Starting job '{job.name}' #{job.id}...")
cwd = os.path.split(path_to_analyseCI)[0]
arguments = f' {job.url_or_path} "{job.name}" {job.id}'
arguments = f' {job.url_or_path} "{job.name}" {job.id} {worker.id}'
task_logger.info(f"Launch command: /bin/bash " + path_to_analyseCI + arguments)
try:
command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments,
@ -483,12 +518,15 @@ async def broadcast(message, channels):
for ws in ws_list:
try:
await ws.send(json.dumps(message, default=datetime_to_epoch_json_converter))
await ws.send(my_json_dumps(message))
except ConnectionClosed:
dead_ws.append(ws)
for to_remove in dead_ws:
ws_list.remove(to_remove)
try:
ws_list.remove(to_remove)
except ValueError:
pass
def subscribe(ws, channel):
subscriptions[channel].append(ws)
@ -575,16 +613,16 @@ async def ws_index(request, websocket):
first_chunck = next(data)
await websocket.send(json.dumps({
await websocket.send(my_json_dumps({
"action": "init_jobs",
"data": first_chunck, # send first chunk
}, default=datetime_to_epoch_json_converter))
}))
for chunk in data:
await websocket.send(json.dumps({
await websocket.send(my_json_dumps({
"action": "init_jobs_stream",
"data": chunk,
}, default=datetime_to_epoch_json_converter))
}))
await websocket.wait_closed()
@ -601,10 +639,10 @@ async def ws_job(request, websocket, job_id):
subscribe(websocket, f"job-{job.id}")
await websocket.send(json.dumps({
await websocket.send(my_json_dumps({
"action": "init_job",
"data": model_to_dict(job),
}, default=datetime_to_epoch_json_converter))
}))
await websocket.wait_closed()
@ -701,10 +739,10 @@ async def ws_apps(request, websocket):
repos = sorted(repos, key=lambda x: x["name"])
await websocket.send(json.dumps({
await websocket.send(my_json_dumps({
"action": "init_apps",
"data": repos,
}, default=datetime_to_epoch_json_converter))
}))
await websocket.wait_closed()
@ -719,10 +757,10 @@ async def ws_app(request, websocket, app_name):
subscribe(websocket, f"app-jobs-{app.url}")
job = list(Job.select().where(Job.url_or_path == app.url).order_by(-Job.id).dicts())
await websocket.send(json.dumps({
await websocket.send(my_json_dumps({
"action": "init_jobs",
"data": job,
}, default=datetime_to_epoch_json_converter))
}))
await websocket.wait_closed()
@ -1122,7 +1160,7 @@ async def github(request):
token = open("./github_bot_token").read().strip()
async with aiohttp.ClientSession(headers={"Authorization": f"token {token}"}) as session:
async with session.post(comments_url, data=json.dumps({"body": body}, default=datetime_to_epoch_json_converter)) as resp:
async with session.post(comments_url, data=my_json_dumps({"body": body})) as resp:
api_logger.info("Added comment %s" % resp.json()["html_url"])
catchphrases = ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:" ]
@ -1164,6 +1202,37 @@ def format_frame(f):
return dict([(k, str(getattr(f, k))) for k in keys])
@app.listener("before_server_start")
async def listener_before_server_start(*args, **kwargs):
task_logger.info("before_server_start")
reset_pending_jobs()
reset_busy_workers()
merge_jobs_on_startup()
set_random_day_for_monthy_job()
@app.listener("after_server_start")
async def listener_after_server_start(*args, **kwargs):
task_logger.info("after_server_start")
@app.listener("before_server_stop")
async def listener_before_server_stop(*args, **kwargs):
task_logger.info("before_server_stop")
@app.listener("after_server_stop")
async def listener_after_server_stop(*args, **kwargs):
task_logger.info("after_server_stop")
for job_id in jobs_in_memory_state:
await stop_job(job_id)
job = Job.select().where(Job.id == job_id)[0]
job.state = "scheduled"
job.log = ""
job.save()
def main(config="./config.py"):
default_config = {
@ -1175,6 +1244,7 @@ def main(config="./config.py"):
"MONITOR_GIT": False,
"MONITOR_ONLY_GOOD_QUALITY_APPS": False,
"MONTHLY_JOBS": False,
"WORKER_COUNT": 1,
}
app.config.update_config(default_config)
@ -1184,12 +1254,6 @@ def main(config="./config.py"):
print(f"Error: analyzer script doesn't exist at '{app.config.PATH_TO_ANALYZER}'. Please fix the configuration in {config}")
sys.exit(1)
reset_pending_jobs()
reset_busy_workers()
merge_jobs_on_startup()
set_random_day_for_monthy_job()
if app.config.MONITOR_APPS_LIST:
app.add_task(monitor_apps_lists(monitor_git=app.config.MONITOR_GIT,
monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS))