Fixes after tests on the battlefield

This commit is contained in:
Alexandre Aubin 2023-01-28 04:12:06 +01:00
parent a435d2973f
commit 8f58109973

162
run.py
View file

@ -11,6 +11,7 @@ import traceback
import itertools
import tracemalloc
import string
import shutil
import hmac
import hashlib
@ -432,25 +433,32 @@ async def jobs_dispatcher():
async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=False):
await asyncio.sleep(3)
await asyncio.sleep(1)
if not os.path.exists(app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id)):
return
job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...\n"
job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup the old package check still running ...\n"
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
task_logger.info(f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...")
cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0]
env = {
"WORKER_ID": worker.id,
"IN_YUNORUNNER": "1",
"WORKER_ID": str(worker.id),
"ARCH": app.config.ARCH,
"DIST": app.config.DIST,
"YNH_BRANCH": app.config.YNH_BRANCH,
"PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin
}
cmd = f"{app.config.PACKAGE_CHECK_PATH} --force-stop"
cmd = f"script -qefc '{app.config.PACKAGE_CHECK_PATH} --force-stop 2>&1'"
try:
command = await asyncio.create_subprocess_shell(cmd,
cwd=cwd,
@ -458,6 +466,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
while not command.stdout.at_eof():
data = await command.stdout.readline()
await asyncio.sleep(1)
except Exception:
traceback.print_exc()
@ -475,7 +484,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal
command.terminate()
if not ignore_error:
job.log += "\nFailed to lill old check process?"
job.log += "\nFailed to kill old check process?"
job.state = "canceled"
task_logger.info(f"Job '{job.name} #{job.id}' has been canceled")
@ -486,6 +495,11 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal
return True
finally:
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
async def run_job(worker, job):
@ -495,6 +509,8 @@ async def run_job(worker, job):
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
await asyncio.sleep(5)
cleanup_ret = await cleanup_old_package_check_if_lock_exists(worker, job)
if cleanup_ret is False:
return
@ -505,7 +521,8 @@ async def run_job(worker, job):
cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0]
env = {
"WORKER_ID": worker.id,
"IN_YUNORUNNER": "1",
"WORKER_ID": str(worker.id),
"ARCH": app.config.ARCH,
"DIST": app.config.DIST,
"YNH_BRANCH": app.config.YNH_BRANCH,
@ -513,24 +530,29 @@ async def run_job(worker, job):
}
now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S")
msg = now + " - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}"
msg = now + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}"
job.log += "=" * len(msg) + "\n"
job.log += msg + "\n"
job.log += "=" * len(msg) + "\n"
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format(worker_id=worker.id)
full_log = app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER.format(worker_id=worker.id)
summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format(worker_id=worker.id)
if os.exists(result_json):
if os.path.exists(result_json):
os.remove(result_json)
if os.exists(full_log):
if os.path.exists(full_log):
os.remove(full_log)
if os.exists(summary_png):
if os.path.exists(summary_png):
os.remove(summary_png)
cmd = f"timeout {app.config.TIMEOUT} --signal=TERM nice --adjustment=10 /bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path}"
cmd = f"timeout --signal TERM {app.config.TIMEOUT} nice --adjustment=10 script -qefc '/bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path} 2>&1'"
task_logger.info(f"Launching command: {cmd}")
try:
@ -585,16 +607,16 @@ async def run_job(worker, job):
job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n"
job.state = "error"
else:
job.log += f"\nPackage check completed"
job.log += f"\nPackage check completed\n"
results = json.load(open(result_json))
level = results["level"]
job.state = "done" if level > 4 else "failure"
log.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n"
job.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n"
os.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log")
os.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json")
os.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png")
shutil.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log")
shutil.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json")
shutil.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png")
public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json"
if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip():
@ -607,61 +629,75 @@ async def run_job(worker, job):
finally:
job.end_time = datetime.now()
now = datetime.datetime.now().strftime("%d/%m/%Y - %H:%M:%S")
msg = now + " - Finished job for {job.name}"
now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S")
msg = now + f" - Finished job for {job.name} ({job.state})"
job.log += "=" * len(msg) + "\n"
job.log += msg + "\n"
job.log += "=" * len(msg) + "\n"
job.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
if "ci-apps.yunohost.org" in app.config.BASE_URL:
async with aiohttp.ClientSession() as session:
async with session.get(APPS_LIST) as resp:
data = await resp.json()
data = data["apps"]
public_level = data.get(job_app, {}).get("level")
try:
async with aiohttp.ClientSession() as session:
async with session.get(APPS_LIST) as resp:
data = await resp.json()
data = data["apps"]
public_level = data.get(job_app, {}).get("level")
job_url = app.config.BASE_URL + "/job/" + job.id
job_id_with_url = f"[#{job.id}]({job_url})"
if job.state == "error":
msg = f"Job {job_id_with_url} for {job_app} failed miserably :("
elif not level == 0:
msg = f"App {job_app} failed all tests in job {job_id_with_url} :("
elif public_level is None:
msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !"
elif level > public_level:
msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !"
elif level < public_level:
msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}"
elif level < 6:
msg = f"App {job_app} stays at level {level} in job {job_id_with_url}"
else:
# Dont notify anything, reduce CI flood on app chatroom if app is already level 6+
msg = ""
job_url = app.config.BASE_URL + "/job/" + job.id
job_id_with_url = f"[#{job.id}]({job_url})"
if job.state == "error":
msg = f"Job {job_id_with_url} for {job_app} failed miserably :("
elif not level == 0:
msg = f"App {job_app} failed all tests in job {job_id_with_url} :("
elif public_level is None:
msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !"
elif level > public_level:
msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !"
elif level < public_level:
msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}"
elif level < 6:
msg = f"App {job_app} stays at level {level} in job {job_id_with_url}"
else:
# Dont notify anything, reduce CI flood on app chatroom if app is already level 6+
msg = ""
if msg:
cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'"
try:
command = await asyncio.create_subprocess_shell(cmd)
while not command.stdout.at_eof():
await asyncio.sleep(1)
except:
pass
if msg:
cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'"
try:
command = await asyncio.create_subprocess_shell(cmd)
while not command.stdout.at_eof():
await asyncio.sleep(1)
except:
pass
except:
traceback.print_exc()
task_logger.exception(f"ERROR in job '{job.name} #{job.id}'")
job.log += "\n"
job.log += "Exception:\n"
job.log += traceback.format_exc()
await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True)
# remove ourself from the state
del jobs_in_memory_state[job.id]
# remove ourself from the state
del jobs_in_memory_state[job.id]
worker.state = "available"
worker.save()
worker.state = "available"
worker.save()
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
await broadcast({
"action": "update_job",
"id": job.id,
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
async def broadcast(message, channels):
@ -1030,6 +1066,7 @@ async def stop_job(job_id):
if job.state == "running":
api_logger.info(f"Cancel running job '{job.name}' [job.id] on request")
job.state = "canceled"
job.end_time = datetime.now()
job.save()
@ -1037,6 +1074,7 @@ async def stop_job(job_id):
jobs_in_memory_state[job.id]["task"].cancel()
worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0]
worker.state = "available"
worker.save()
@ -1045,6 +1083,8 @@ async def stop_job(job_id):
"data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True)
return response.text("ok")
if job.state in ("done", "canceled", "failure", "error"):
@ -1061,7 +1101,7 @@ async def stop_job(job_id):
@app.route("/api/job/<job_id:int>/stop", methods=['POST'])
async def api_stop_job(request, job_id):
# TODO auth or some kind
await stop_job(job_id)
return await stop_job(job_id)
@app.route("/api/job/<job_id:int>/restart", methods=['POST'])
@ -1185,7 +1225,7 @@ async def html_index(request):
return {'relative_path_to_root': '', 'path': request.path}
@always_relaunch(sleep=2)
@always_relaunch(sleep=10)
async def number_of_tasks():
print("Number of tasks: %s" % len(asyncio_all_tasks()))
@ -1201,7 +1241,7 @@ async def monitor(request):
"top_20_trace": [str(x) for x in top_stats[:20]],
"tasks": {
"number": len(tasks),
"array": map(show_coro, tasks),
"array": [show_coro(t) for t in tasks],
}
})
@ -1416,7 +1456,7 @@ def main(config="./config.py"):
"WORKER_COUNT": 1,
"ARCH": "amd64",
"DIST": "bullseye",
"PACKAGE_CHECK_DIR": "./package_check/",
"PACKAGE_CHECK_DIR": yunorunner_dir + "/package_check/",
"WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"],
"WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"],
}