diff --git a/run.py b/run.py index 174281c..26e4104 100644 --- a/run.py +++ b/run.py @@ -11,6 +11,7 @@ import traceback import itertools import tracemalloc import string +import shutil import hmac import hashlib @@ -432,25 +433,32 @@ async def jobs_dispatcher(): async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=False): - await asyncio.sleep(3) + await asyncio.sleep(1) if not os.path.exists(app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id)): return - job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...\n" + job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup the old package check still running ...\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + task_logger.info(f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...") cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { - "WORKER_ID": worker.id, + "IN_YUNORUNNER": "1", + "WORKER_ID": str(worker.id), "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } - cmd = f"{app.config.PACKAGE_CHECK_PATH} --force-stop" + cmd = f"script -qefc '{app.config.PACKAGE_CHECK_PATH} --force-stop 2>&1'" try: command = await asyncio.create_subprocess_shell(cmd, cwd=cwd, @@ -458,6 +466,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) while not command.stdout.at_eof(): + data = await command.stdout.readline() await asyncio.sleep(1) except Exception: traceback.print_exc() @@ -475,7 +484,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal command.terminate() if not ignore_error: - job.log += "\nFailed to lill old check process?" + job.log += "\nFailed to kill old check process?" job.state = "canceled" task_logger.info(f"Job '{job.name} #{job.id}' has been canceled") @@ -486,6 +495,11 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal return True finally: job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) async def run_job(worker, job): @@ -495,6 +509,8 @@ async def run_job(worker, job): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await asyncio.sleep(5) + cleanup_ret = await cleanup_old_package_check_if_lock_exists(worker, job) if cleanup_ret is False: return @@ -505,7 +521,8 @@ async def run_job(worker, job): cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { - "WORKER_ID": worker.id, + "IN_YUNORUNNER": "1", + "WORKER_ID": str(worker.id), "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, @@ -513,24 +530,29 @@ async def run_job(worker, job): } now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + " - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + msg = now + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format(worker_id=worker.id) full_log = app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER.format(worker_id=worker.id) summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format(worker_id=worker.id) - if os.exists(result_json): + if os.path.exists(result_json): os.remove(result_json) - if os.exists(full_log): + if os.path.exists(full_log): os.remove(full_log) - if os.exists(summary_png): + if os.path.exists(summary_png): os.remove(summary_png) - cmd = f"timeout {app.config.TIMEOUT} --signal=TERM nice --adjustment=10 /bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path}" + cmd = f"timeout --signal TERM {app.config.TIMEOUT} nice --adjustment=10 script -qefc '/bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path} 2>&1'" task_logger.info(f"Launching command: {cmd}") try: @@ -585,16 +607,16 @@ async def run_job(worker, job): job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n" job.state = "error" else: - job.log += f"\nPackage check completed" + job.log += f"\nPackage check completed\n" results = json.load(open(result_json)) level = results["level"] job.state = "done" if level > 4 else "failure" - log.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" + job.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" - os.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") - os.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") - os.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") + shutil.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") + shutil.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") + shutil.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip(): @@ -607,61 +629,75 @@ async def run_job(worker, job): finally: job.end_time = datetime.now() - now = datetime.datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + " - Finished job for {job.name}" + now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") + msg = now + f" - Finished job for {job.name} ({job.state})" job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + if "ci-apps.yunohost.org" in app.config.BASE_URL: - async with aiohttp.ClientSession() as session: - async with session.get(APPS_LIST) as resp: - data = await resp.json() - data = data["apps"] - public_level = data.get(job_app, {}).get("level") + try: + async with aiohttp.ClientSession() as session: + async with session.get(APPS_LIST) as resp: + data = await resp.json() + data = data["apps"] + public_level = data.get(job_app, {}).get("level") - job_url = app.config.BASE_URL + "/job/" + job.id - job_id_with_url = f"[#{job.id}]({job_url})" - if job.state == "error": - msg = f"Job {job_id_with_url} for {job_app} failed miserably :(" - elif not level == 0: - msg = f"App {job_app} failed all tests in job {job_id_with_url} :(" - elif public_level is None: - msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !" - elif level > public_level: - msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !" - elif level < public_level: - msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" - elif level < 6: - msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" - else: - # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ - msg = "" + job_url = app.config.BASE_URL + "/job/" + job.id + job_id_with_url = f"[#{job.id}]({job_url})" + if job.state == "error": + msg = f"Job {job_id_with_url} for {job_app} failed miserably :(" + elif not level == 0: + msg = f"App {job_app} failed all tests in job {job_id_with_url} :(" + elif public_level is None: + msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !" + elif level > public_level: + msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !" + elif level < public_level: + msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" + elif level < 6: + msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" + else: + # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ + msg = "" - if msg: - cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'" - try: - command = await asyncio.create_subprocess_shell(cmd) - while not command.stdout.at_eof(): - await asyncio.sleep(1) - except: - pass + if msg: + cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'" + try: + command = await asyncio.create_subprocess_shell(cmd) + while not command.stdout.at_eof(): + await asyncio.sleep(1) + except: + pass + except: + traceback.print_exc() + task_logger.exception(f"ERROR in job '{job.name} #{job.id}'") + + job.log += "\n" + job.log += "Exception:\n" + job.log += traceback.format_exc() await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) - # remove ourself from the state - del jobs_in_memory_state[job.id] + # remove ourself from the state + del jobs_in_memory_state[job.id] - worker.state = "available" - worker.save() + worker.state = "available" + worker.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) async def broadcast(message, channels): @@ -1030,6 +1066,7 @@ async def stop_job(job_id): if job.state == "running": api_logger.info(f"Cancel running job '{job.name}' [job.id] on request") + job.state = "canceled" job.end_time = datetime.now() job.save() @@ -1037,6 +1074,7 @@ async def stop_job(job_id): jobs_in_memory_state[job.id]["task"].cancel() worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0] + worker.state = "available" worker.save() @@ -1045,6 +1083,8 @@ async def stop_job(job_id): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) + return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): @@ -1061,7 +1101,7 @@ async def stop_job(job_id): @app.route("/api/job//stop", methods=['POST']) async def api_stop_job(request, job_id): # TODO auth or some kind - await stop_job(job_id) + return await stop_job(job_id) @app.route("/api/job//restart", methods=['POST']) @@ -1185,7 +1225,7 @@ async def html_index(request): return {'relative_path_to_root': '', 'path': request.path} -@always_relaunch(sleep=2) +@always_relaunch(sleep=10) async def number_of_tasks(): print("Number of tasks: %s" % len(asyncio_all_tasks())) @@ -1201,7 +1241,7 @@ async def monitor(request): "top_20_trace": [str(x) for x in top_stats[:20]], "tasks": { "number": len(tasks), - "array": map(show_coro, tasks), + "array": [show_coro(t) for t in tasks], } }) @@ -1416,7 +1456,7 @@ def main(config="./config.py"): "WORKER_COUNT": 1, "ARCH": "amd64", "DIST": "bullseye", - "PACKAGE_CHECK_DIR": "./package_check/", + "PACKAGE_CHECK_DIR": yunorunner_dir + "/package_check/", "WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"], "WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"], }