From a435d2973f7c344f6652010f74c1a657ae6ba6fd Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 27 Jan 2023 22:26:44 +0100 Subject: [PATCH 1/5] Moar refactoring x_x --- analyze_yunohost_app.sh | 239 ---------------------------------- maintenance/chat_notify.sh | 2 +- maintenance/finish_install.sh | 22 ++-- run.py | 213 ++++++++++++++++++++++++++---- 4 files changed, 197 insertions(+), 279 deletions(-) delete mode 100755 analyze_yunohost_app.sh diff --git a/analyze_yunohost_app.sh b/analyze_yunohost_app.sh deleted file mode 100755 index e8cf695..0000000 --- a/analyze_yunohost_app.sh +++ /dev/null @@ -1,239 +0,0 @@ -#!/bin/bash - -if [ "${0:0:1}" == "/" ]; then script_dir="$(dirname "$0")"; else script_dir="$(echo $PWD/$(dirname "$0" | cut -d '.' -f2) | sed 's@/$@@')"; fi - -cd $script_dir - -if [ $# -ne 4 ] -then - echo "This script need to take in argument the package which be tested, the name of the test, and the ID of the worker." - exit 1 -fi - -mkdir -p /var/run/yunorunner/locks/ - -worker_id="$4" -lock_yunorunner="/var/run/yunorunner/locks/${worker_id}.lock" -lock_package_check="./package_check/pcheck-${worker_id}.lock" - -# 10800 sec / 60 = 180 min = 3 hours -TIMEOUT="10800" - -BASE_URL="$(cat "./config.py" | tr -d ' "' | grep "^BASE_URL=" | cut --delimiter="=" --fields=2)" -ynh_branch="$(cat "./config.py" | tr -d ' "' | grep "^YNH_BRANCH=" | cut --delimiter="=" --fields=2)" -arch="$(dpkg --print-architecture)" -dist="$(cat "./config.py" | tr -d ' "' | grep "^DIST=" | cut --delimiter="=" --fields=2)" - -# Enable chat notifications only on main CI -if [[ "$ynh_branch" == "stable" ]] && [[ "$arch" == "amd64" ]] && [[ -e "./maintenance/chat_notify.sh" ]] -then - chat_notify="./maintenance/chat_notify.sh" -else - chat_notify="true" # 'true' is a dummy program that won't do anything -fi - -#================================================= -# Delay the beginning of this script, to prevent concurrent executions -#================================================= - -# Get 3 ramdom digit. To build a value between 001 and 999 -milli_sleep=$(head --lines=20 /dev/urandom | tr --complement --delete '0-9' | head --bytes=3) -# And wait for this value in millisecond -sleep "5.$milli_sleep" - -#============================ -# Check / take the lock -#============================= - -if [ -e $lock_yunorunner ] -then - lock_yunorunner_PID="$(cat $lock_yunorunner)" - if [ -n "$lock_yunorunner_PID" ] - then - # We check that the corresponding PID is still running AND that the PPid is not 1 .. - # If the PPid is 1, it tends to indicate that a previous analyseCI is still running and was not killed, and therefore got adopted by init. - # This typically happens when the job is cancelled / restarted .. though we should have a better way of handling cancellation from yunorunner directly :/ - if ps --pid $lock_yunorunner_PID | grep --quiet $lock_yunorunner_PID && [[ $(grep PPid /proc/${lock_yunorunner_PID}/status | awk '{print $2}') != "1" ]] - then - echo -e "\e[91m\e[1m!!! Another analyseCI process is currently using the lock $lock_yunorunner !!!\e[0m" - "$chat_notify" "CI miserably crashed because another process is using the lock" - sleep 10 - exit 1 - fi - fi - [[ $(grep PPid /proc/$(lock_yunorunner_PID)/status | awk '{print $2}') != "1" ]] && { echo "Killing stale analyseCI process ..."; kill -s SIGTERM $lock_yunorunner_PID; sleep 30; } - echo "Removing stale lock" - rm -f $lock_yunorunner -fi - -echo "$$" > $lock_yunorunner - -#============================ -# Cleanup after exit/kill -#============================= - -function cleanup() -{ - rm $lock_yunorunner - - if [ -n "$package_check_pid" ] - then - kill -s SIGTERM $package_check_pid - WORKER_ID="$worker_id" ARCH="$arch" DIST="$dist" YNH_BRANCH="$ynh_branch" "./package_check/package_check.sh" --force-stop - fi -} - -trap cleanup EXIT -trap 'exit 2' TERM - -#============================ -# Test parameters -#============================= - -repo="$1" -test_name="$2" -job_id="$3" - -# Keep only the repositery -repo=$(echo $repo | cut --delimiter=';' --fields=1) -app="$(echo $test_name | awk '{print $1}')" - -test_full_log=${app}_${arch}_${ynh_branch}_complete.log -test_json_results="./results/logs/${app}_${arch}_${ynh_branch}_results.json" -test_url="$BASE_URL/job/$job_id" - -# Make sure /usr/local/bin is in the path, because that's where the lxc/lxd bin lives -export PATH=$PATH:/usr/local/bin - -#================================================= -# Timeout handling utils -#================================================= - -function watchdog() { - local package_check_pid=$1 - # Start a loop while package check is working - while ps --pid $package_check_pid | grep --quiet $package_check_pid - do - sleep 10 - - if [ -e $lock_package_check ] - then - lock_timestamp="$(stat -c %Y $lock_package_check)" - current_timestamp="$(date +%s)" - if [[ "$(($current_timestamp - $lock_timestamp))" -gt "$TIMEOUT" ]] - then - kill -s SIGTERM $package_check_pid - rm -f $lock_package_check - force_stop "Package check aborted, timeout reached ($(( $TIMEOUT / 60 )) min)." - return 1 - fi - fi - done - - if [ ! -e "./package_check/results-$worker_id.json" ] - then - force_stop "It looks like package_check did not finish properly ... on $test_url" - return 1 - fi -} - -function force_stop() { - local message="$1" - - echo -e "\e[91m\e[1m!!! $message !!!\e[0m" - - "$chat_notify" "While testing $app: $message" - - WORKER_ID="$worker_id" ARCH="$arch" DIST="$dist" YNH_BRANCH="$ynh_branch" "./package_check/package_check.sh" --force-stop -} - -#================================================= -# The actual testing ... -#================================================= - -# Exec package check according to the architecture -echo "$(date) - Starting a test for $app on architecture $arch distribution $dist with yunohost $ynh_branch" - -rm -f "./package_check/Complete-$worker_id.log" -rm -f "./package_check/results-$worker_id.json" - -# Here we use a weird trick with 'script -qefc' -# The reason is that : -# if running the command in background (with &) the corresponding command *won't be in a tty* (not sure exactly) -# therefore later, the command lxc exec -t *won't be in a tty* (despite the -t) and command outputs will appear empty... -# Instead, with the magic of script -qefc we can pretend to be in a tty somehow... -# Adapted from https://stackoverflow.com/questions/32910661/pretend-to-be-a-tty-in-bash-for-any-command -cmd="WORKER_ID=$worker_id ARCH=$arch DIST=$dist YNH_BRANCH=$ynh_branch nice --adjustment=10 './package_check/package_check.sh' '$repo' 2>&1" -script -qefc "$cmd" & - -watchdog $! || exit 1 - -# Copy the complete log -cp "./package_check/Complete-$worker_id.log" "./results/logs/$test_full_log" -cp "./package_check/results-$worker_id.json" "$test_json_results" -rm -f "./package_check/Complete-$worker_id.log" -rm -f "./package_check/results-$worker_id.json" -[ ! -e "./package_check/summary.png" ] || cp "./package_check/summary.png" "./result/summary/${job_id}.png" - -if [ -n "$BASE_URL" ] -then - full_log_path="$BASE_URL/logs/$test_full_log" -else - full_log_path="$(pwd)/logs/$test_full_log" -fi - -echo "The complete log for this application was duplicated and is accessible at $full_log_path" - -echo "" -echo "-------------------------------------------" -echo "" - -#================================================= -# Check / update level of the app -#================================================= - -public_result_list="./results/logs/list_level_${ynh_branch}_$arch.json" -[ -s "$public_result_list" ] || echo "{}" > "$public_result_list" - -# Check that we have a valid json... -jq -e '' "$test_json_results" >/dev/null 2>/dev/null && bad_json="false" || bad_json="true" - -# Get new level and previous level -app_level="$(jq -r ".level" "$test_json_results")" -previous_level="$(jq -r ".$app" "$public_result_list")" - -# We post message on chat if we're running for tests on stable/amd64 -if [ "$bad_json" == "true" ] || [ "$app_level" -eq 0 ]; then - message="Application $app completely failed the continuous integration tests" -elif [ -z "$previous_level" ]; then - message="Application $app rises from level (unknown) to level $app_level" -elif [ $app_level -gt $previous_level ]; then - message="Application $app rises from level $previous_level to level $app_level" -elif [ $app_level -lt $previous_level ]; then - message="Application $app goes down from level $previous_level to level $app_level" -elif [ $app_level -ge 6 ]; then - # Dont notify anything, reduce CI flood on app chatroom - message="" -else - message="Application $app stays at level $app_level" -fi - -# Send chat notification -if [[ -n "$message" ]] -then - message+=" on $test_url" - echo $message - "$chat_notify" "$message" -fi - -# Update/add the results from package_check in the public result list -if [ "$bad_json" == "false" ] -then - jq --argfile results "$test_json_results" ".\"$app\"=\$results" $public_result_list > $public_result_list.new - mv $public_result_list.new $public_result_list -fi - -# Annnd we're done ! -echo "$(date) - Test completed" - -[ "$app_level" -gt 5 ] && exit 0 || exit 1 diff --git a/maintenance/chat_notify.sh b/maintenance/chat_notify.sh index 38581a7..0eace53 100755 --- a/maintenance/chat_notify.sh +++ b/maintenance/chat_notify.sh @@ -21,4 +21,4 @@ MCHOME="/opt/matrix-commander/" MCARGS="-c $MCHOME/credentials.json --store $MCHOME/store" -timeout 10 "$MCHOME/venv/bin/matrix-commander" $MCARGS -m "$@" --room 'yunohost-apps' +timeout 10 "$MCHOME/venv/bin/matrix-commander" $MCARGS -m "$@" --room 'yunohost-apps' --markdown diff --git a/maintenance/finish_install.sh b/maintenance/finish_install.sh index 7b543dd..6a1ed94 100755 --- a/maintenance/finish_install.sh +++ b/maintenance/finish_install.sh @@ -83,35 +83,33 @@ function tweak_yunorunner() { # Remove the original database, in order to rebuilt it with the new config. rm -f $YUNORUNNER_HOME/db.sqlite + cat >$YUNORUNNER_HOME/config.py <$YUNORUNNER_HOME/config.py <$YUNORUNNER_HOME/config.py < 4 else "failure" + + log.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" + + os.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") + os.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") + os.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") + + public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" + if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip(): + public_result = {} + else: + public_result = json.load(open(public_result_json_path)) + + public_result[job_app] = results + open(public_result_json_path, "w").write(json.dumps(public_result)) + finally: job.end_time = datetime.now() - job.state = "done" if command.returncode == 0 else "failure" + + now = datetime.datetime.now().strftime("%d/%m/%Y - %H:%M:%S") + msg = now + " - Finished job for {job.name}" + job.log += "=" * len(msg) + "\n" + job.log += msg + "\n" + job.log += "=" * len(msg) + "\n" + job.save() + if "ci-apps.yunohost.org" in app.config.BASE_URL: + async with aiohttp.ClientSession() as session: + async with session.get(APPS_LIST) as resp: + data = await resp.json() + data = data["apps"] + public_level = data.get(job_app, {}).get("level") + + job_url = app.config.BASE_URL + "/job/" + job.id + job_id_with_url = f"[#{job.id}]({job_url})" + if job.state == "error": + msg = f"Job {job_id_with_url} for {job_app} failed miserably :(" + elif not level == 0: + msg = f"App {job_app} failed all tests in job {job_id_with_url} :(" + elif public_level is None: + msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !" + elif level > public_level: + msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !" + elif level < public_level: + msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" + elif level < 6: + msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" + else: + # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ + msg = "" + + if msg: + cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'" + try: + command = await asyncio.create_subprocess_shell(cmd) + while not command.stdout.at_eof(): + await asyncio.sleep(1) + except: + pass + + await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) + # remove ourself from the state del jobs_in_memory_state[job.id] @@ -1107,8 +1257,7 @@ async def github(request): # Check the comment contains proper keyword trigger body = hook_infos["comment"]["body"].strip()[:100].lower() - triggers = ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"] - if not any(trigger.lower() in body for trigger in triggers): + if not any(trigger.lower() in body for trigger in app.config.WEBHOOK_TRIGGERS): # Nothing to do but success anyway (204 = No content) return response.json({'msg': "Nothing to do"}, 204) @@ -1182,8 +1331,7 @@ async def github(request): respjson = await resp.json() api_logger.info("Added comment %s" % respjson["html_url"]) - catchphrases = ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:" ] - catchphrase = random.choice(catchphrases) + catchphrase = random.choice(app.config.WEBHOOK_CATCHPHRASES) # Dirty hack with BASE_URL passed from cmd argument because we can't use request.url_for because Sanic < 20.x job_url = app.config.BASE_URL + app.url_for("html_job", job_id=job.id) badge_url = app.config.BASE_URL + app.url_for("api_badge_job", job_id=job.id) @@ -1258,21 +1406,32 @@ def main(config="./config.py"): default_config = { "BASE_URL": "", "PORT": 4242, + "TIMEOUT": 10800, "DEBUG": False, - "PATH_TO_ANALYZER": "./analyze_yunohost_app.sh", "MONITOR_APPS_LIST": False, "MONITOR_GIT": False, "MONITOR_ONLY_GOOD_QUALITY_APPS": False, "MONTHLY_JOBS": False, "ANSWER_TO_AUTO_UPDATER": True, "WORKER_COUNT": 1, + "ARCH": "amd64", + "DIST": "bullseye", + "PACKAGE_CHECK_DIR": "./package_check/", + "WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"], + "WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"], } app.config.update_config(default_config) app.config.update_config(config) - if not os.path.exists(app.config.PATH_TO_ANALYZER): - print(f"Error: analyzer script doesn't exist at '{app.config.PATH_TO_ANALYZER}'. Please fix the configuration in {config}") + app.config.PACKAGE_CHECK_PATH = app.config.PACKAGE_CHECK_DIR + "package_check.sh" + app.config.PACKAGE_CHECK_LOCK_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "pcheck-{worker_id}.lock" + app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "full_log_{worker_id}.log" + app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "results_{worker_id}.json" + app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "summary_{worker_id}.png" + + if not os.path.exists(app.config.PACKAGE_CHECK_PATH): + print(f"Error: analyzer script doesn't exist at '{app.config.PACKAGE_CHECK_PATH}'. Please fix the configuration in {config}") sys.exit(1) if app.config.MONITOR_APPS_LIST: From 8f581099733e62be1411a394a40ae56b3d302330 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sat, 28 Jan 2023 04:12:06 +0100 Subject: [PATCH 2/5] Fixes after tests on the battlefield --- run.py | 162 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 101 insertions(+), 61 deletions(-) diff --git a/run.py b/run.py index 174281c..26e4104 100644 --- a/run.py +++ b/run.py @@ -11,6 +11,7 @@ import traceback import itertools import tracemalloc import string +import shutil import hmac import hashlib @@ -432,25 +433,32 @@ async def jobs_dispatcher(): async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=False): - await asyncio.sleep(3) + await asyncio.sleep(1) if not os.path.exists(app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id)): return - job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...\n" + job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup the old package check still running ...\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + task_logger.info(f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...") cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { - "WORKER_ID": worker.id, + "IN_YUNORUNNER": "1", + "WORKER_ID": str(worker.id), "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } - cmd = f"{app.config.PACKAGE_CHECK_PATH} --force-stop" + cmd = f"script -qefc '{app.config.PACKAGE_CHECK_PATH} --force-stop 2>&1'" try: command = await asyncio.create_subprocess_shell(cmd, cwd=cwd, @@ -458,6 +466,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) while not command.stdout.at_eof(): + data = await command.stdout.readline() await asyncio.sleep(1) except Exception: traceback.print_exc() @@ -475,7 +484,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal command.terminate() if not ignore_error: - job.log += "\nFailed to lill old check process?" + job.log += "\nFailed to kill old check process?" job.state = "canceled" task_logger.info(f"Job '{job.name} #{job.id}' has been canceled") @@ -486,6 +495,11 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal return True finally: job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) async def run_job(worker, job): @@ -495,6 +509,8 @@ async def run_job(worker, job): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await asyncio.sleep(5) + cleanup_ret = await cleanup_old_package_check_if_lock_exists(worker, job) if cleanup_ret is False: return @@ -505,7 +521,8 @@ async def run_job(worker, job): cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { - "WORKER_ID": worker.id, + "IN_YUNORUNNER": "1", + "WORKER_ID": str(worker.id), "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, @@ -513,24 +530,29 @@ async def run_job(worker, job): } now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + " - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + msg = now + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format(worker_id=worker.id) full_log = app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER.format(worker_id=worker.id) summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format(worker_id=worker.id) - if os.exists(result_json): + if os.path.exists(result_json): os.remove(result_json) - if os.exists(full_log): + if os.path.exists(full_log): os.remove(full_log) - if os.exists(summary_png): + if os.path.exists(summary_png): os.remove(summary_png) - cmd = f"timeout {app.config.TIMEOUT} --signal=TERM nice --adjustment=10 /bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path}" + cmd = f"timeout --signal TERM {app.config.TIMEOUT} nice --adjustment=10 script -qefc '/bin/bash {app.config.PACKAGE_CHECK_PATH} {job.url_or_path} 2>&1'" task_logger.info(f"Launching command: {cmd}") try: @@ -585,16 +607,16 @@ async def run_job(worker, job): job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n" job.state = "error" else: - job.log += f"\nPackage check completed" + job.log += f"\nPackage check completed\n" results = json.load(open(result_json)) level = results["level"] job.state = "done" if level > 4 else "failure" - log.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" + job.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" - os.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") - os.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") - os.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") + shutil.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") + shutil.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") + shutil.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip(): @@ -607,61 +629,75 @@ async def run_job(worker, job): finally: job.end_time = datetime.now() - now = datetime.datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + " - Finished job for {job.name}" + now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") + msg = now + f" - Finished job for {job.name} ({job.state})" job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + if "ci-apps.yunohost.org" in app.config.BASE_URL: - async with aiohttp.ClientSession() as session: - async with session.get(APPS_LIST) as resp: - data = await resp.json() - data = data["apps"] - public_level = data.get(job_app, {}).get("level") + try: + async with aiohttp.ClientSession() as session: + async with session.get(APPS_LIST) as resp: + data = await resp.json() + data = data["apps"] + public_level = data.get(job_app, {}).get("level") - job_url = app.config.BASE_URL + "/job/" + job.id - job_id_with_url = f"[#{job.id}]({job_url})" - if job.state == "error": - msg = f"Job {job_id_with_url} for {job_app} failed miserably :(" - elif not level == 0: - msg = f"App {job_app} failed all tests in job {job_id_with_url} :(" - elif public_level is None: - msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !" - elif level > public_level: - msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !" - elif level < public_level: - msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" - elif level < 6: - msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" - else: - # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ - msg = "" + job_url = app.config.BASE_URL + "/job/" + job.id + job_id_with_url = f"[#{job.id}]({job_url})" + if job.state == "error": + msg = f"Job {job_id_with_url} for {job_app} failed miserably :(" + elif not level == 0: + msg = f"App {job_app} failed all tests in job {job_id_with_url} :(" + elif public_level is None: + msg = f"App {job_app} rises from level (unknown) to {level} in job {job_id_with_url} !" + elif level > public_level: + msg = f"App {job_app} rises from level {public_level} to {level} in job {job_id_with_url} !" + elif level < public_level: + msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" + elif level < 6: + msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" + else: + # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ + msg = "" - if msg: - cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'" - try: - command = await asyncio.create_subprocess_shell(cmd) - while not command.stdout.at_eof(): - await asyncio.sleep(1) - except: - pass + if msg: + cmd = f"{yunorunner_dir}/maintenance/chat_notify.sh '{msg}'" + try: + command = await asyncio.create_subprocess_shell(cmd) + while not command.stdout.at_eof(): + await asyncio.sleep(1) + except: + pass + except: + traceback.print_exc() + task_logger.exception(f"ERROR in job '{job.name} #{job.id}'") + + job.log += "\n" + job.log += "Exception:\n" + job.log += traceback.format_exc() await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) - # remove ourself from the state - del jobs_in_memory_state[job.id] + # remove ourself from the state + del jobs_in_memory_state[job.id] - worker.state = "available" - worker.save() + worker.state = "available" + worker.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast({ + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) async def broadcast(message, channels): @@ -1030,6 +1066,7 @@ async def stop_job(job_id): if job.state == "running": api_logger.info(f"Cancel running job '{job.name}' [job.id] on request") + job.state = "canceled" job.end_time = datetime.now() job.save() @@ -1037,6 +1074,7 @@ async def stop_job(job_id): jobs_in_memory_state[job.id]["task"].cancel() worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0] + worker.state = "available" worker.save() @@ -1045,6 +1083,8 @@ async def stop_job(job_id): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) + return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): @@ -1061,7 +1101,7 @@ async def stop_job(job_id): @app.route("/api/job//stop", methods=['POST']) async def api_stop_job(request, job_id): # TODO auth or some kind - await stop_job(job_id) + return await stop_job(job_id) @app.route("/api/job//restart", methods=['POST']) @@ -1185,7 +1225,7 @@ async def html_index(request): return {'relative_path_to_root': '', 'path': request.path} -@always_relaunch(sleep=2) +@always_relaunch(sleep=10) async def number_of_tasks(): print("Number of tasks: %s" % len(asyncio_all_tasks())) @@ -1201,7 +1241,7 @@ async def monitor(request): "top_20_trace": [str(x) for x in top_stats[:20]], "tasks": { "number": len(tasks), - "array": map(show_coro, tasks), + "array": [show_coro(t) for t in tasks], } }) @@ -1416,7 +1456,7 @@ def main(config="./config.py"): "WORKER_COUNT": 1, "ARCH": "amd64", "DIST": "bullseye", - "PACKAGE_CHECK_DIR": "./package_check/", + "PACKAGE_CHECK_DIR": yunorunner_dir + "/package_check/", "WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"], "WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"], } From 4992ffe4f6acc8a889df0ddb5c66dc2e31ec6343 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sun, 29 Jan 2023 18:34:07 +0100 Subject: [PATCH 3/5] Moar fixes from the battlefield --- run.py | 94 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/run.py b/run.py index 26e4104..52f7c76 100644 --- a/run.py +++ b/run.py @@ -480,7 +480,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal job.state = "error" return False - except CancelledError: + except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() if not ignore_error: @@ -564,6 +564,7 @@ async def run_job(worker, job): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + while not command.stdout.at_eof(): data = await command.stdout.readline() @@ -581,7 +582,7 @@ async def run_job(worker, job): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - except CancelledError: + except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() job.log += "\n" job.state = "canceled" @@ -604,7 +605,7 @@ async def run_job(worker, job): job.state = "error" else: if command.returncode != 0 or not os.path.exists(result_json): - job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n" + job.log += f"\nJob failed ? Return code is {command.returncode} / Or maybe the json result doesnt exist...\n" job.state = "error" else: job.log += f"\nPackage check completed\n" @@ -685,7 +686,8 @@ async def run_job(worker, job): job.log += "Exception:\n" job.log += traceback.format_exc() - await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) + #if job.state != "canceled": + # await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) # remove ourself from the state del jobs_in_memory_state[job.id] @@ -1083,8 +1085,6 @@ async def stop_job(job_id): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) - return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): @@ -1225,25 +1225,25 @@ async def html_index(request): return {'relative_path_to_root': '', 'path': request.path} -@always_relaunch(sleep=10) -async def number_of_tasks(): - print("Number of tasks: %s" % len(asyncio_all_tasks())) - - -@app.route('/monitor') -async def monitor(request): - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics('lineno') - - tasks = asyncio_all_tasks() - - return response.json({ - "top_20_trace": [str(x) for x in top_stats[:20]], - "tasks": { - "number": len(tasks), - "array": [show_coro(t) for t in tasks], - } - }) +#@always_relaunch(sleep=10) +#async def number_of_tasks(): +# print("Number of tasks: %s" % len(asyncio_all_tasks())) +# +# +#@app.route('/monitor') +#async def monitor(request): +# snapshot = tracemalloc.take_snapshot() +# top_stats = snapshot.statistics('lineno') +# +# tasks = asyncio_all_tasks() +# +# return response.json({ +# "top_20_trace": [str(x) for x in top_stats[:20]], +# "tasks": { +# "number": len(tasks), +# "array": [show_coro(t) for t in tasks], +# } +# }) @app.route("/github", methods=["GET"]) @@ -1385,29 +1385,29 @@ async def github(request): return response.text("ok") -def show_coro(c): - data = { - 'txt': str(c), - 'type': str(type(c)), - 'done': c.done(), - 'cancelled': False, - 'stack': None, - 'exception': None, - } - if not c.done(): - data['stack'] = [format_frame(x) for x in c.get_stack()] - else: - if c.cancelled(): - data['cancelled'] = True - else: - data['exception'] = str(c.exception()) - - return data +#def show_coro(c): +# data = { +# 'txt': str(c), +# 'type': str(type(c)), +# 'done': c.done(), +# 'cancelled': False, +# 'stack': None, +# 'exception': None, +# } +# if not c.done(): +# data['stack'] = [format_frame(x) for x in c.get_stack()] +# else: +# if c.cancelled(): +# data['cancelled'] = True +# else: +# data['exception'] = str(c.exception()) +# +# return data -def format_frame(f): - keys = ['f_code', 'f_lineno'] - return dict([(k, str(getattr(f, k))) for k in keys]) +#def format_frame(f): +# keys = ['f_code', 'f_lineno'] +# return dict([(k, str(getattr(f, k))) for k in keys]) @app.listener("before_server_start") @@ -1482,7 +1482,7 @@ def main(config="./config.py"): app.add_task(launch_monthly_job()) app.add_task(jobs_dispatcher()) - app.add_task(number_of_tasks()) + #app.add_task(number_of_tasks()) app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG) From 6eb3075d6bdebe1b01e6398b04b9492ea2dfa75e Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sun, 29 Jan 2023 19:01:13 +0100 Subject: [PATCH 4/5] Black --- run.py | 850 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 551 insertions(+), 299 deletions(-) diff --git a/run.py b/run.py index 52f7c76..e061c6c 100644 --- a/run.py +++ b/run.py @@ -43,7 +43,7 @@ from models import Repo, Job, db, Worker from schedule import always_relaunch, once_per_day # This is used by ciclic -admin_token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) +admin_token = "".join(random.choices(string.ascii_lowercase + string.digits, k=32)) open(".admin_token", "w").write(admin_token) try: @@ -72,7 +72,7 @@ LOGGING_CONFIG_DEFAULTS["handlers"] = { "class": "logging.StreamHandler", "formatter": "background", "stream": sys.stdout, - } + }, } LOGGING_CONFIG_DEFAULTS["formatters"] = { @@ -91,7 +91,8 @@ LOGGING_CONFIG_DEFAULTS["formatters"] = { def datetime_to_epoch_json_converter(o): if isinstance(o, datetime): - return o.strftime('%s') + return o.strftime("%s") + # define a custom json dumps to convert datetime def my_json_dumps(o): @@ -102,19 +103,19 @@ task_logger = logging.getLogger("task") api_logger = logging.getLogger("api") app = Sanic(__name__, dumps=my_json_dumps) -app.static('/static', './static/') +app.static("/static", "./static/") yunorunner_dir = os.path.abspath(os.path.dirname(__file__)) -loader = FileSystemLoader(yunorunner_dir + '/templates', encoding='utf8') +loader = FileSystemLoader(yunorunner_dir + "/templates", encoding="utf8") jinja = SanicJinja2(app, loader=loader) # to avoid conflict with vue.js -jinja.env.block_start_string = '<%' -jinja.env.block_end_string = '%>' -jinja.env.variable_start_string = '<{' -jinja.env.variable_end_string = '}>' -jinja.env.comment_start_string = '<#' -jinja.env.comment_end_string = '#>' +jinja.env.block_start_string = "<%" +jinja.env.block_end_string = "%>" +jinja.env.variable_start_string = "<{" +jinja.env.variable_end_string = "}>" +jinja.env.comment_start_string = "<#" +jinja.env.comment_end_string = "#>" APPS_LIST = "https://app.yunohost.org/default/v2/apps.json" @@ -178,7 +179,9 @@ def merge_jobs_on_startup(): def set_random_day_for_monthy_job(): for repo in Repo.select().where((Repo.random_job_day == None)): repo.random_job_day = random.randint(1, 28) - task_logger.info(f"set random day for monthly job of repo '{repo.name}' at '{repo.random_job_day}'") + task_logger.info( + f"set random day for monthly job of repo '{repo.name}' at '{repo.random_job_day}'" + ) repo.save() @@ -189,7 +192,9 @@ async def create_job(app_id, repo_url, job_comment=""): # avoid scheduling twice if Job.select().where(Job.name == job_name, Job.state == "scheduled").count() > 0: - task_logger.info(f"a job for '{job_name} is already scheduled, not adding another one") + task_logger.info( + f"a job for '{job_name} is already scheduled, not adding another one" + ) return job = Job.create( @@ -198,10 +203,13 @@ async def create_job(app_id, repo_url, job_comment=""): state="scheduled", ) - await broadcast({ - "action": "new_job", - "data": model_to_dict(job), - }, "jobs") + await broadcast( + { + "action": "new_job", + "data": model_to_dict(job), + }, + "jobs", + ) return job @@ -212,7 +220,11 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F # only support github for now :( async def get_master_commit_sha(url): - command = await asyncio.create_subprocess_shell(f"git ls-remote {url} master", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + command = await asyncio.create_subprocess_shell( + f"git ls-remote {url} master", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) data = await command.stdout.read() commit_sha = data.decode().strip().replace("\t", " ").split(" ")[0] return commit_sha @@ -237,37 +249,48 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F task_logger.debug(f"skip {app_id} because app is not good quality") continue - # already know, look to see if there is new commits if app_id in repos: repo = repos[app_id] # but first check if the URL has changed if repo.url != app_data["git"]["url"]: - task_logger.info(f"Application {app_id} has changed of url from {repo.url} to {app_data['git']['url']}") + task_logger.info( + f"Application {app_id} has changed of url from {repo.url} to {app_data['git']['url']}" + ) repo.url = app_data["git"]["url"] repo.save() - await broadcast({ - "action": "update_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "update_app", + "data": model_to_dict(repo), + }, + "apps", + ) # change the url of all jobs that used to have this URL I # guess :/ # this isn't perfect because that could overwrite added by # hand jobs but well... - for job in Job.select().where(Job.url_or_path == repo.url, Job.state == "scheduled"): + for job in Job.select().where( + Job.url_or_path == repo.url, Job.state == "scheduled" + ): job.url_or_path = repo.url job.save() - task_logger.info(f"Updating job {job.name} #{job.id} for {app_id} to {repo.url} since the app has changed of url") + task_logger.info( + f"Updating job {job.name} #{job.id} for {app_id} to {repo.url} since the app has changed of url" + ) - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) # we don't want to do anything else if not monitor_git: @@ -275,15 +298,19 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo_is_updated = False if repo.revision != commit_sha: - task_logger.info(f"Application {app_id} has new commits on github " - f"({repo.revision} → {commit_sha}), schedule new job") + task_logger.info( + f"Application {app_id} has new commits on github " + f"({repo.revision} → {commit_sha}), schedule new job" + ) repo.revision = commit_sha repo.save() repo_is_updated = True await create_job(app_id, repo.url) - repo_state = "working" if app_data["state"] == "working" else "other_than_working" + repo_state = ( + "working" if app_data["state"] == "working" else "other_than_working" + ) if repo.state != repo_state: repo.state = repo_state @@ -296,26 +323,37 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo_is_updated = True if repo_is_updated: - await broadcast({ - "action": "update_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "update_app", + "data": model_to_dict(repo), + }, + "apps", + ) # new app elif app_id not in repos: - task_logger.info(f"New application detected: {app_id} " + (", scheduling a new job" if monitor_git else "")) + task_logger.info( + f"New application detected: {app_id} " + + (", scheduling a new job" if monitor_git else "") + ) repo = Repo.create( name=app_id, url=app_data["git"]["url"], revision=commit_sha, - state="working" if app_data["state"] == "working" else "other_than_working", + state="working" + if app_data["state"] == "working" + else "other_than_working", random_job_day=random.randint(1, 28), ) - await broadcast({ - "action": "new_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "new_app", + "data": model_to_dict(repo), + }, + "apps", + ) if monitor_git: await create_job(app_id, repo.url) @@ -329,29 +367,43 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo = repos[repo_name] # delete scheduled jobs first - task_logger.info(f"Application {repo_name} has been removed from the app list, start by removing its scheduled job if there are any...") - for job in Job.select().where(Job.url_or_path == repo.url, Job.state == "scheduled"): + task_logger.info( + f"Application {repo_name} has been removed from the app list, start by removing its scheduled job if there are any..." + ) + for job in Job.select().where( + Job.url_or_path == repo.url, Job.state == "scheduled" + ): await api_stop_job(None, job.id) # not sure this is going to work job_id = job.id - task_logger.info(f"Delete scheduled job {job.name} #{job.id} for application {repo_name} because the application is being deleted.") + task_logger.info( + f"Delete scheduled job {job.name} #{job.id} for application {repo_name} because the application is being deleted." + ) data = model_to_dict(job) job.delete_instance() - await broadcast({ - "action": "delete_job", - "data": data, - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "delete_job", + "data": data, + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) - task_logger.info(f"Delete application {repo_name} because it has been removed from the apps list.") + task_logger.info( + f"Delete application {repo_name} because it has been removed from the apps list." + ) data = model_to_dict(repo) repo.delete_instance() - await broadcast({ - "action": "delete_app", - "data": data, - }, "apps") + await broadcast( + { + "action": "delete_app", + "data": data, + }, + "apps", + ) @once_per_day @@ -359,7 +411,9 @@ async def launch_monthly_job(): today = date.today().day for repo in Repo.select().where(Repo.random_job_day == today): - task_logger.info(f"Launch monthly job for {repo.name} on day {today} of the month ") + task_logger.info( + f"Launch monthly job for {repo.name} on day {today} of the month " + ) await create_job(repo.name, repo.url) @@ -405,7 +459,7 @@ async def jobs_dispatcher(): if workers.count() == 0: return - with db.atomic('IMMEDIATE'): + with db.atomic("IMMEDIATE"): jobs = Job.select().where(Job.state == "scheduled") # no jobs to process, wait @@ -435,18 +489,25 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal await asyncio.sleep(1) - if not os.path.exists(app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id)): + if not os.path.exists( + app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id) + ): return job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup the old package check still running ...\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) - task_logger.info(f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...") + task_logger.info( + f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ..." + ) cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { @@ -455,16 +516,19 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, - "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin + "PATH": os.environ["PATH"] + + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } cmd = f"script -qefc '{app.config.PACKAGE_CHECK_PATH} --force-stop 2>&1'" try: - command = await asyncio.create_subprocess_shell(cmd, - cwd=cwd, - env=env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) + command = await asyncio.create_subprocess_shell( + cmd, + cwd=cwd, + env=env, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) while not command.stdout.at_eof(): data = await command.stdout.readline() await asyncio.sleep(1) @@ -495,19 +559,25 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal return True finally: job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) async def run_job(worker, job): - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) await asyncio.sleep(5) @@ -526,24 +596,35 @@ async def run_job(worker, job): "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, - "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin + "PATH": os.environ["PATH"] + + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + msg = ( + now + + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + ) job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) - result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format(worker_id=worker.id) + result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format( + worker_id=worker.id + ) full_log = app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER.format(worker_id=worker.id) - summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format(worker_id=worker.id) + summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format( + worker_id=worker.id + ) if os.path.exists(result_json): os.remove(result_json) @@ -556,31 +637,35 @@ async def run_job(worker, job): task_logger.info(f"Launching command: {cmd}") try: - command = await asyncio.create_subprocess_shell(cmd, - cwd=cwd, - env=env, - # default limit is not enough in some situations - limit=(2 ** 16) ** 10, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - + command = await asyncio.create_subprocess_shell( + cmd, + cwd=cwd, + env=env, + # default limit is not enough in some situations + limit=(2 ** 16) ** 10, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) while not command.stdout.at_eof(): data = await command.stdout.readline() try: - job.log += data.decode('utf-8', 'replace') + job.log += data.decode("utf-8", "replace") except UnicodeDecodeError as e: job.log += "Uhoh ?! UnicodeDecodeError in yunorunner !?" job.log += str(e) job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() @@ -616,11 +701,23 @@ async def run_job(worker, job): job.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" shutil.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") - shutil.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") - shutil.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") + shutil.copy( + result_json, + yunorunner_dir + + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json", + ) + shutil.copy( + summary_png, yunorunner_dir + f"/results/summary/{job.id}.png" + ) - public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" - if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip(): + public_result_json_path = ( + yunorunner_dir + + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" + ) + if ( + os.path.exists(public_result_json_path) + or not open(public_result_json_path).read().strip() + ): public_result = {} else: public_result = json.load(open(public_result_json_path)) @@ -637,12 +734,14 @@ async def run_job(worker, job): job.log += "=" * len(msg) + "\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) if "ci-apps.yunohost.org" in app.config.BASE_URL: try: @@ -665,7 +764,9 @@ async def run_job(worker, job): elif level < public_level: msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" elif level < 6: - msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" + msg = ( + f"App {job_app} stays at level {level} in job {job_id_with_url}" + ) else: # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ msg = "" @@ -686,7 +787,7 @@ async def run_job(worker, job): job.log += "Exception:\n" job.log += traceback.format_exc() - #if job.state != "canceled": + # if job.state != "canceled": # await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) # remove ourself from the state @@ -695,11 +796,14 @@ async def run_job(worker, job): worker.state = "available" worker.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) async def broadcast(message, channels): @@ -722,6 +826,7 @@ async def broadcast(message, channels): except ValueError: pass + def subscribe(ws, channel): subscriptions[channel].append(ws) @@ -765,7 +870,7 @@ def chunks(l, n): yield chunk -@app.websocket('/index-ws') +@app.websocket("/index-ws") @clean_websocket async def ws_index(request, websocket): subscribe(websocket, "jobs") @@ -782,46 +887,67 @@ async def ws_index(request, websocket): ) JobAlias = Job.alias() - subquery = JobAlias.select(*selected_fields)\ - .where(JobAlias.state << ("done", "failure", "canceled", "error"))\ - .group_by(JobAlias.url_or_path)\ - .select(fn.Max(JobAlias.id).alias("max_id")) + subquery = ( + JobAlias.select(*selected_fields) + .where(JobAlias.state << ("done", "failure", "canceled", "error")) + .group_by(JobAlias.url_or_path) + .select(fn.Max(JobAlias.id).alias("max_id")) + ) - latest_done_jobs = Job.select(*selected_fields)\ - .join(subquery, on=(Job.id == subquery.c.max_id))\ - .order_by(-Job.id) + latest_done_jobs = ( + Job.select(*selected_fields) + .join(subquery, on=(Job.id == subquery.c.max_id)) + .order_by(-Job.id) + ) - subquery = JobAlias.select(*selected_fields)\ - .where(JobAlias.state == "scheduled")\ - .group_by(JobAlias.url_or_path)\ - .select(fn.Min(JobAlias.id).alias("min_id")) + subquery = ( + JobAlias.select(*selected_fields) + .where(JobAlias.state == "scheduled") + .group_by(JobAlias.url_or_path) + .select(fn.Min(JobAlias.id).alias("min_id")) + ) - next_scheduled_jobs = Job.select(*selected_fields)\ - .join(subquery, on=(Job.id == subquery.c.min_id))\ - .order_by(-Job.id) + next_scheduled_jobs = ( + Job.select(*selected_fields) + .join(subquery, on=(Job.id == subquery.c.min_id)) + .order_by(-Job.id) + ) # chunks initial data by batch of 30 to avoid killing firefox - data = chunks(itertools.chain(map(model_to_dict, next_scheduled_jobs.iterator()), - map(model_to_dict, Job.select().where(Job.state == "running").iterator()), - map(model_to_dict, latest_done_jobs.iterator())), 30) + data = chunks( + itertools.chain( + map(model_to_dict, next_scheduled_jobs.iterator()), + map(model_to_dict, Job.select().where(Job.state == "running").iterator()), + map(model_to_dict, latest_done_jobs.iterator()), + ), + 30, + ) first_chunck = next(data) - await websocket.send(my_json_dumps({ - "action": "init_jobs", - "data": first_chunck, # send first chunk - })) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs", + "data": first_chunck, # send first chunk + } + ) + ) for chunk in data: - await websocket.send(my_json_dumps({ - "action": "init_jobs_stream", - "data": chunk, - })) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs_stream", + "data": chunk, + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/job-ws/') +@app.websocket("/job-ws/") @clean_websocket async def ws_job(request, websocket, job_id): job = Job.select().where(Job.id == job_id) @@ -833,15 +959,19 @@ async def ws_job(request, websocket, job_id): subscribe(websocket, f"job-{job.id}") - await websocket.send(my_json_dumps({ - "action": "init_job", - "data": model_to_dict(job), - })) + await websocket.send( + my_json_dumps( + { + "action": "init_job", + "data": model_to_dict(job), + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/apps-ws') +@app.websocket("/apps-ws") @clean_websocket async def ws_apps(request, websocket): subscribe(websocket, "jobs") @@ -849,7 +979,8 @@ async def ws_apps(request, websocket): # I need to do this because peewee strangely fuck up on join and remove the # subquery fields which breaks everything - repos = Repo.raw(''' + repos = Repo.raw( + """ SELECT "id", "name", @@ -894,7 +1025,8 @@ async def ws_apps(request, websocket): ("t5"."url_or_path" = "t1"."url") ORDER BY "name" - ''') + """ + ) repos = [ { @@ -907,41 +1039,58 @@ async def ws_apps(request, websocket): "job_id": x.job_id, "job_name": x.job_name, "job_state": x.job_state, - "created_time": datetime.strptime(x.created_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.created_time else None, - "started_time": datetime.strptime(x.started_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.started_time else None, - "end_time": datetime.strptime(x.end_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.end_time else None, - } for x in repos + "created_time": datetime.strptime( + x.created_time.split(".")[0], "%Y-%m-%d %H:%M:%S" + ) + if x.created_time + else None, + "started_time": datetime.strptime( + x.started_time.split(".")[0], "%Y-%m-%d %H:%M:%S" + ) + if x.started_time + else None, + "end_time": datetime.strptime(x.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S") + if x.end_time + else None, + } + for x in repos ] # add apps without jobs selected_repos = {x["id"] for x in repos} for repo in Repo.select().where(Repo.id.not_in(selected_repos)): - repos.append({ - "id": repo.id, - "name": repo.name, - "url": repo.url, - "revision": repo.revision, - "state": repo.state, - "random_job_day": repo.random_job_day, - "job_id": None, - "job_name": None, - "job_state": None, - "created_time": None, - "started_time": None, - "end_time": None, - }) + repos.append( + { + "id": repo.id, + "name": repo.name, + "url": repo.url, + "revision": repo.revision, + "state": repo.state, + "random_job_day": repo.random_job_day, + "job_id": None, + "job_name": None, + "job_state": None, + "created_time": None, + "started_time": None, + "end_time": None, + } + ) repos = sorted(repos, key=lambda x: x["name"]) - await websocket.send(my_json_dumps({ - "action": "init_apps", - "data": repos, - })) + await websocket.send( + my_json_dumps( + { + "action": "init_apps", + "data": repos, + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/app-ws/') +@app.websocket("/app-ws/") @clean_websocket async def ws_app(request, websocket, app_name): # XXX I don't check if the app exists because this websocket is supposed to @@ -950,11 +1099,21 @@ async def ws_app(request, websocket, app_name): subscribe(websocket, f"app-jobs-{app.url}") - job = list(Job.select().where(Job.url_or_path == app.url).order_by(-Job.id).limit(10).dicts()) - await websocket.send(my_json_dumps({ - "action": "init_jobs", - "data": job, - })) + job = list( + Job.select() + .where(Job.url_or_path == app.url) + .order_by(-Job.id) + .limit(10) + .dicts() + ) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs", + "data": job, + } + ) + ) await websocket.wait_for_connection_lost() @@ -966,23 +1125,32 @@ def require_token(): # run some method that checks the request # for the client's authorization status if "X-Token" not in request.headers: - return response.json({'status': 'you need to provide a token ' - 'to access the API, please ' - 'refer to the README'}, 403) + return response.json( + { + "status": "you need to provide a token " + "to access the API, please " + "refer to the README" + }, + 403, + ) token = request.headers["X-Token"].strip() if not hmac.compare_digest(token, admin_token): - api_logger.warning("someone tried to access the API using an invalid admin token") - return response.json({'status': 'invalid token'}, 403) + api_logger.warning( + "someone tried to access the API using an invalid admin token" + ) + return response.json({"status": "invalid token"}, 403) result = await f(request, *args, **kwargs) return result + return decorated_function + return decorator -@app.route("/api/job", methods=['POST']) +@app.route("/api/job", methods=["POST"]) @require_token() async def api_new_job(request): job = Job.create( @@ -993,26 +1161,29 @@ async def api_new_job(request): api_logger.info(f"Request to add new job '{job.name}' [{job.id}]") - await broadcast({ - "action": "new_job", - "data": model_to_dict(job), - }, ["jobs", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "new_job", + "data": model_to_dict(job), + }, + ["jobs", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") -@app.route("/api/job", methods=['GET']) +@app.route("/api/job", methods=["GET"]) @require_token() async def api_list_job(request): query = Job.select() if not all: - query.where(Job.state in ('scheduled', 'running')) + query.where(Job.state in ("scheduled", "running")) return response.json([model_to_dict(x) for x in query.order_by(-Job.id)]) -@app.route("/api/app", methods=['GET']) +@app.route("/api/app", methods=["GET"]) @require_token() async def api_list_app(request): query = Repo.select() @@ -1020,7 +1191,7 @@ async def api_list_app(request): return response.json([model_to_dict(x) for x in query.order_by(Repo.name)]) -@app.route("/api/job/", methods=['DELETE']) +@app.route("/api/job/", methods=["DELETE"]) @require_token() async def api_delete_job(request, job_id): api_logger.info(f"Request to restart job {job_id}") @@ -1035,10 +1206,13 @@ async def api_delete_job(request, job_id): data = model_to_dict(job) job.delete_instance() - await broadcast({ - "action": "delete_job", - "data": data, - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "delete_job", + "data": data, + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") @@ -1054,15 +1228,17 @@ async def stop_job(job_id): api_logger.info(f"Request to stop job '{job.name}' [{job.id}]") if job.state == "scheduled": - api_logger.info(f"Cancel scheduled job '{job.name}' [job.id] " - f"on request") + api_logger.info(f"Cancel scheduled job '{job.name}' [job.id] " f"on request") job.state = "canceled" job.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") @@ -1075,36 +1251,42 @@ async def stop_job(job_id): jobs_in_memory_state[job.id]["task"].cancel() - worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0] + worker = Worker.select().where( + Worker.id == jobs_in_memory_state[job.id]["worker"] + )[0] worker.state = "available" worker.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): - api_logger.info(f"Request to cancel job '{job.name}' " - f"[job.id] but job is already in '{job.state}' state, " - f"do nothing") + api_logger.info( + f"Request to cancel job '{job.name}' " + f"[job.id] but job is already in '{job.state}' state, " + f"do nothing" + ) # nothing to do, task is already done return response.text("ok") - raise Exception(f"Tryed to cancel a job with an unknown state: " - f"{job.state}") + raise Exception(f"Tryed to cancel a job with an unknown state: " f"{job.state}") -@app.route("/api/job//stop", methods=['POST']) +@app.route("/api/job//stop", methods=["POST"]) async def api_stop_job(request, job_id): # TODO auth or some kind return await stop_job(job_id) -@app.route("/api/job//restart", methods=['POST']) +@app.route("/api/job//restart", methods=["POST"]) async def api_restart_job(request, job_id): api_logger.info(f"Request to restart job {job_id}") # Calling a route (eg api_stop_job) doesn't work anymore @@ -1116,16 +1298,19 @@ async def api_restart_job(request, job_id): job.log = "" job.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") # Meant to interface with https://shields.io/endpoint -@app.route("/api/job//badge", methods=['GET']) +@app.route("/api/job//badge", methods=["GET"]) async def api_badge_job(request, job_id): job = Job.select().where(Job.id == job_id) @@ -1136,24 +1321,26 @@ async def api_badge_job(request, job_id): job = job[0] state_to_color = { - 'scheduled': 'lightgrey', - 'running': 'blue', - 'done': 'brightgreen', - 'failure': 'red', - 'error': 'red', - 'canceled': 'yellow', + "scheduled": "lightgrey", + "running": "blue", + "done": "brightgreen", + "failure": "red", + "error": "red", + "canceled": "yellow", } - return response.json({ - "schemaVersion": 1, - "label": "tests", - "message": job.state, - "color": state_to_color[job.state] - }) + return response.json( + { + "schemaVersion": 1, + "label": "tests", + "message": job.state, + "color": state_to_color[job.state], + } + ) -@app.route('/job/') -@jinja.template('job.html') +@app.route("/job/") +@jinja.template("job.html") async def html_job(request, job_id): job = Job.select().where(Job.id == job_id) @@ -1172,41 +1359,45 @@ async def html_job(request, job_id): return { "job": job, - 'app': application, - 'job_url': job_url, - 'badge_url': badge_url, - 'shield_badge_url': shield_badge_url, - 'summary_url': summary_url, - 'relative_path_to_root': '../', - 'path': request.path + "app": application, + "job_url": job_url, + "badge_url": badge_url, + "shield_badge_url": shield_badge_url, + "summary_url": summary_url, + "relative_path_to_root": "../", + "path": request.path, } -@app.route('/apps/', strict_slashes=True) # To avoid reaching the route "/apps//" with an empty string -@jinja.template('apps.html') +@app.route( + "/apps/", strict_slashes=True +) # To avoid reaching the route "/apps//" with an empty string +@jinja.template("apps.html") async def html_apps(request): - return {'relative_path_to_root': '../', 'path': request.path} + return {"relative_path_to_root": "../", "path": request.path} -@app.route('/apps//') -@jinja.template('app.html') +@app.route("/apps//") +@jinja.template("app.html") async def html_app(request, app_name): app = Repo.select().where(Repo.name == app_name) if app.count() == 0: raise NotFound() - return {"app": app[0], 'relative_path_to_root': '../../', 'path': request.path} + return {"app": app[0], "relative_path_to_root": "../../", "path": request.path} -@app.route('/apps//latestjob') +@app.route("/apps//latestjob") async def html_app_latestjob(request, app_name): _app = Repo.select().where(Repo.name == app_name) if _app.count() == 0: raise NotFound() - jobs = Job.select(fn.MAX(Job.id)).where(Job.url_or_path == _app[0].url, Job.state != 'scheduled') + jobs = Job.select(fn.MAX(Job.id)).where( + Job.url_or_path == _app[0].url, Job.state != "scheduled" + ) if jobs.count() == 0: jobs = Job.select(fn.MAX(Job.id)).where(Job.url_or_path == _app[0].url) @@ -1219,19 +1410,19 @@ async def html_app_latestjob(request, app_name): return response.redirect(job_url) -@app.route('/') -@jinja.template('index.html') +@app.route("/") +@jinja.template("index.html") async def html_index(request): - return {'relative_path_to_root': '', 'path': request.path} + return {"relative_path_to_root": "", "path": request.path} -#@always_relaunch(sleep=10) -#async def number_of_tasks(): +# @always_relaunch(sleep=10) +# async def number_of_tasks(): # print("Number of tasks: %s" % len(asyncio_all_tasks())) # # -#@app.route('/monitor') -#async def monitor(request): +# @app.route('/monitor') +# async def monitor(request): # snapshot = tracemalloc.take_snapshot() # top_stats = snapshot.statistics('lineno') # @@ -1259,28 +1450,36 @@ async def github(request): # Abort directly if no secret opened # (which also allows to only enable this feature if # we define the webhook secret) - if not os.path.exists("./github_webhook_secret") or not os.path.exists("./github_bot_token"): - api_logger.info(f"Received a webhook but no ./github_webhook_secret or ./github_bot_token file exists ... ignoring") - return response.json({'error': 'GitHub hooks not configured'}, 403) + if not os.path.exists("./github_webhook_secret") or not os.path.exists( + "./github_bot_token" + ): + api_logger.info( + f"Received a webhook but no ./github_webhook_secret or ./github_bot_token file exists ... ignoring" + ) + return response.json({"error": "GitHub hooks not configured"}, 403) # Only SHA1 is supported header_signature = request.headers.get("X-Hub-Signature") if header_signature is None: api_logger.info("Received a webhook but there's no header X-Hub-Signature") - return response.json({'error': 'No X-Hub-Signature'}, 403) + return response.json({"error": "No X-Hub-Signature"}, 403) sha_name, signature = header_signature.split("=") if sha_name != "sha1": - api_logger.info("Received a webhook but signing algo isn't sha1, it's '%s'" % sha_name) - return response.json({'error': "Signing algorightm is not sha1 ?!"}, 501) + api_logger.info( + "Received a webhook but signing algo isn't sha1, it's '%s'" % sha_name + ) + return response.json({"error": "Signing algorightm is not sha1 ?!"}, 501) secret = open("./github_webhook_secret", "r").read().strip() # HMAC requires the key to be bytes, but data is string mac = hmac.new(secret.encode(), msg=request.body, digestmod=hashlib.sha1) if not hmac.compare_digest(str(mac.hexdigest()), str(signature)): - api_logger.info(f"Received a webhook but signature authentication failed (is the secret properly configured?)") - return response.json({'error': "Bad signature ?!"}, 403) + api_logger.info( + f"Received a webhook but signature authentication failed (is the secret properly configured?)" + ) + return response.json({"error": "Bad signature ?!"}, 403) hook_type = request.headers.get("X-Github-Event") hook_infos = request.json @@ -1289,17 +1488,19 @@ async def github(request): # - *New* comments # - On issue/PRs which are still open if hook_type == "issue_comment": - if hook_infos["action"] != "created" \ - or hook_infos["issue"]["state"] != "open" \ - or "pull_request" not in hook_infos["issue"]: - # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + if ( + hook_infos["action"] != "created" + or hook_infos["issue"]["state"] != "open" + or "pull_request" not in hook_infos["issue"] + ): + # Nothing to do but success anyway (204 = No content) + return response.json({"msg": "Nothing to do"}, 204) # Check the comment contains proper keyword trigger body = hook_infos["comment"]["body"].strip()[:100].lower() if not any(trigger.lower() in body for trigger in app.config.WEBHOOK_TRIGGERS): # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) # We only accept this from people which are member of the org # https://docs.github.com/en/rest/reference/orgs#check-organization-membership-for-a-user @@ -1307,34 +1508,49 @@ async def github(request): # which is not represented in the original webhook async def is_user_in_organization(user): token = open("./github_bot_token").read().strip() - async with aiohttp.ClientSession(headers={"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"}) as session: - resp = await session.get(f"https://api.github.com/orgs/YunoHost-Apps/members/{user}") + async with aiohttp.ClientSession( + headers={ + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + } + ) as session: + resp = await session.get( + f"https://api.github.com/orgs/YunoHost-Apps/members/{user}" + ) return resp.status == 204 if not await is_user_in_organization(hook_infos["comment"]["user"]["login"]): # Unauthorized - return response.json({'error': "Unauthorized"}, 403) + return response.json({"error": "Unauthorized"}, 403) # Fetch the PR infos (yeah they ain't in the initial infos we get @_@) pr_infos_url = hook_infos["issue"]["pull_request"]["url"] elif hook_type == "pull_request": if hook_infos["action"] != "opened": # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) # We only accept PRs that are created by github-action bot - if hook_infos["pull_request"]["user"]["login"] != "github-actions[bot]" \ - or not hook_infos["pull_request"]["head"]["ref"].startswith("ci-auto-update-"): + if hook_infos["pull_request"]["user"][ + "login" + ] != "github-actions[bot]" or not hook_infos["pull_request"]["head"][ + "ref" + ].startswith( + "ci-auto-update-" + ): # Unauthorized - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) if not app.config.ANSWER_TO_AUTO_UPDATER: # Unauthorized - return response.json({'msg': "Nothing to do, I am configured to ignore the auto-updater"}, 204) + return response.json( + {"msg": "Nothing to do, I am configured to ignore the auto-updater"}, + 204, + ) # Fetch the PR infos (yeah they ain't in the initial infos we get @_@) pr_infos_url = hook_infos["pull_request"]["url"] else: # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) async with aiohttp.ClientSession() as session: async with session.get(pr_infos_url) as resp: @@ -1345,17 +1561,21 @@ async def github(request): url_to_test = f"{repo}/tree/{branch_name}" app_id = pr_infos["base"]["repo"]["name"].rstrip("") if app_id.endswith("_ynh"): - app_id = app_id[:-len("_ynh")] + app_id = app_id[: -len("_ynh")] pr_id = str(pr_infos["number"]) # Create the job for the corresponding app (with the branch url) api_logger.info("Scheduling a new job from comment on a PR") - job = await create_job(app_id, url_to_test, job_comment=f"PR #{pr_id}, {branch_name}") + job = await create_job( + app_id, url_to_test, job_comment=f"PR #{pr_id}, {branch_name}" + ) if not job: - return response.json({'msg': "Nothing to do, corresponding job already scheduled"}, 204) + return response.json( + {"msg": "Nothing to do, corresponding job already scheduled"}, 204 + ) # Answer with comment with link+badge for the job @@ -1366,8 +1586,12 @@ async def github(request): comments_url = hook_infos["pull_request"]["comments_url"] token = open("./github_bot_token").read().strip() - async with aiohttp.ClientSession(headers={"Authorization": f"token {token}"}) as session: - async with session.post(comments_url, data=my_json_dumps({"body": body})) as resp: + async with aiohttp.ClientSession( + headers={"Authorization": f"token {token}"} + ) as session: + async with session.post( + comments_url, data=my_json_dumps({"body": body}) + ) as resp: respjson = await resp.json() api_logger.info("Added comment %s" % respjson["html_url"]) @@ -1385,7 +1609,7 @@ async def github(request): return response.text("ok") -#def show_coro(c): +# def show_coro(c): # data = { # 'txt': str(c), # 'type': str(type(c)), @@ -1405,7 +1629,7 @@ async def github(request): # return data -#def format_frame(f): +# def format_frame(f): # keys = ['f_code', 'f_lineno'] # return dict([(k, str(getattr(f, k))) for k in keys]) @@ -1457,33 +1681,61 @@ def main(config="./config.py"): "ARCH": "amd64", "DIST": "bullseye", "PACKAGE_CHECK_DIR": yunorunner_dir + "/package_check/", - "WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"], - "WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"], + "WEBHOOK_TRIGGERS": [ + "!testme", + "!gogogadgetoci", + "By the power of systemd, I invoke The Great App CI to test this Pull Request!", + ], + "WEBHOOK_CATCHPHRASES": [ + "Alrighty!", + "Fingers crossed!", + "May the CI gods be with you!", + ":carousel_horse:", + ":rocket:", + ":sunflower:", + "Meow :cat2:", + ":v:", + ":stuck_out_tongue_winking_eye:", + ], } app.config.update_config(default_config) app.config.update_config(config) app.config.PACKAGE_CHECK_PATH = app.config.PACKAGE_CHECK_DIR + "package_check.sh" - app.config.PACKAGE_CHECK_LOCK_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "pcheck-{worker_id}.lock" - app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "full_log_{worker_id}.log" - app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "results_{worker_id}.json" - app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "summary_{worker_id}.png" + app.config.PACKAGE_CHECK_LOCK_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "pcheck-{worker_id}.lock" + ) + app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "full_log_{worker_id}.log" + ) + app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "results_{worker_id}.json" + ) + app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "summary_{worker_id}.png" + ) if not os.path.exists(app.config.PACKAGE_CHECK_PATH): - print(f"Error: analyzer script doesn't exist at '{app.config.PACKAGE_CHECK_PATH}'. Please fix the configuration in {config}") + print( + f"Error: analyzer script doesn't exist at '{app.config.PACKAGE_CHECK_PATH}'. Please fix the configuration in {config}" + ) sys.exit(1) if app.config.MONITOR_APPS_LIST: - app.add_task(monitor_apps_lists(monitor_git=app.config.MONITOR_GIT, - monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS)) + app.add_task( + monitor_apps_lists( + monitor_git=app.config.MONITOR_GIT, + monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS, + ) + ) if app.config.MONTHLY_JOBS: app.add_task(launch_monthly_job()) app.add_task(jobs_dispatcher()) - #app.add_task(number_of_tasks()) - app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG) + # app.add_task(number_of_tasks()) + app.run("localhost", port=app.config.PORT, debug=app.config.DEBUG) if __name__ == "__main__": From aa2e196df682cb912915df2d7655f3925727cb4a Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Mon, 6 Feb 2023 18:11:12 +0100 Subject: [PATCH 5/5] index page: Limit latest jobs display to 500 to prevent unecessarily lagging the browser --- run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/run.py b/run.py index e061c6c..a9a25bd 100644 --- a/run.py +++ b/run.py @@ -898,6 +898,7 @@ async def ws_index(request, websocket): Job.select(*selected_fields) .join(subquery, on=(Job.id == subquery.c.max_id)) .order_by(-Job.id) + .limit(500) ) subquery = (