diff --git a/run.py b/run.py index 52f7c76..e061c6c 100644 --- a/run.py +++ b/run.py @@ -43,7 +43,7 @@ from models import Repo, Job, db, Worker from schedule import always_relaunch, once_per_day # This is used by ciclic -admin_token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) +admin_token = "".join(random.choices(string.ascii_lowercase + string.digits, k=32)) open(".admin_token", "w").write(admin_token) try: @@ -72,7 +72,7 @@ LOGGING_CONFIG_DEFAULTS["handlers"] = { "class": "logging.StreamHandler", "formatter": "background", "stream": sys.stdout, - } + }, } LOGGING_CONFIG_DEFAULTS["formatters"] = { @@ -91,7 +91,8 @@ LOGGING_CONFIG_DEFAULTS["formatters"] = { def datetime_to_epoch_json_converter(o): if isinstance(o, datetime): - return o.strftime('%s') + return o.strftime("%s") + # define a custom json dumps to convert datetime def my_json_dumps(o): @@ -102,19 +103,19 @@ task_logger = logging.getLogger("task") api_logger = logging.getLogger("api") app = Sanic(__name__, dumps=my_json_dumps) -app.static('/static', './static/') +app.static("/static", "./static/") yunorunner_dir = os.path.abspath(os.path.dirname(__file__)) -loader = FileSystemLoader(yunorunner_dir + '/templates', encoding='utf8') +loader = FileSystemLoader(yunorunner_dir + "/templates", encoding="utf8") jinja = SanicJinja2(app, loader=loader) # to avoid conflict with vue.js -jinja.env.block_start_string = '<%' -jinja.env.block_end_string = '%>' -jinja.env.variable_start_string = '<{' -jinja.env.variable_end_string = '}>' -jinja.env.comment_start_string = '<#' -jinja.env.comment_end_string = '#>' +jinja.env.block_start_string = "<%" +jinja.env.block_end_string = "%>" +jinja.env.variable_start_string = "<{" +jinja.env.variable_end_string = "}>" +jinja.env.comment_start_string = "<#" +jinja.env.comment_end_string = "#>" APPS_LIST = "https://app.yunohost.org/default/v2/apps.json" @@ -178,7 +179,9 @@ def merge_jobs_on_startup(): def set_random_day_for_monthy_job(): for repo in Repo.select().where((Repo.random_job_day == None)): repo.random_job_day = random.randint(1, 28) - task_logger.info(f"set random day for monthly job of repo '{repo.name}' at '{repo.random_job_day}'") + task_logger.info( + f"set random day for monthly job of repo '{repo.name}' at '{repo.random_job_day}'" + ) repo.save() @@ -189,7 +192,9 @@ async def create_job(app_id, repo_url, job_comment=""): # avoid scheduling twice if Job.select().where(Job.name == job_name, Job.state == "scheduled").count() > 0: - task_logger.info(f"a job for '{job_name} is already scheduled, not adding another one") + task_logger.info( + f"a job for '{job_name} is already scheduled, not adding another one" + ) return job = Job.create( @@ -198,10 +203,13 @@ async def create_job(app_id, repo_url, job_comment=""): state="scheduled", ) - await broadcast({ - "action": "new_job", - "data": model_to_dict(job), - }, "jobs") + await broadcast( + { + "action": "new_job", + "data": model_to_dict(job), + }, + "jobs", + ) return job @@ -212,7 +220,11 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F # only support github for now :( async def get_master_commit_sha(url): - command = await asyncio.create_subprocess_shell(f"git ls-remote {url} master", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + command = await asyncio.create_subprocess_shell( + f"git ls-remote {url} master", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) data = await command.stdout.read() commit_sha = data.decode().strip().replace("\t", " ").split(" ")[0] return commit_sha @@ -237,37 +249,48 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F task_logger.debug(f"skip {app_id} because app is not good quality") continue - # already know, look to see if there is new commits if app_id in repos: repo = repos[app_id] # but first check if the URL has changed if repo.url != app_data["git"]["url"]: - task_logger.info(f"Application {app_id} has changed of url from {repo.url} to {app_data['git']['url']}") + task_logger.info( + f"Application {app_id} has changed of url from {repo.url} to {app_data['git']['url']}" + ) repo.url = app_data["git"]["url"] repo.save() - await broadcast({ - "action": "update_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "update_app", + "data": model_to_dict(repo), + }, + "apps", + ) # change the url of all jobs that used to have this URL I # guess :/ # this isn't perfect because that could overwrite added by # hand jobs but well... - for job in Job.select().where(Job.url_or_path == repo.url, Job.state == "scheduled"): + for job in Job.select().where( + Job.url_or_path == repo.url, Job.state == "scheduled" + ): job.url_or_path = repo.url job.save() - task_logger.info(f"Updating job {job.name} #{job.id} for {app_id} to {repo.url} since the app has changed of url") + task_logger.info( + f"Updating job {job.name} #{job.id} for {app_id} to {repo.url} since the app has changed of url" + ) - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) # we don't want to do anything else if not monitor_git: @@ -275,15 +298,19 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo_is_updated = False if repo.revision != commit_sha: - task_logger.info(f"Application {app_id} has new commits on github " - f"({repo.revision} → {commit_sha}), schedule new job") + task_logger.info( + f"Application {app_id} has new commits on github " + f"({repo.revision} → {commit_sha}), schedule new job" + ) repo.revision = commit_sha repo.save() repo_is_updated = True await create_job(app_id, repo.url) - repo_state = "working" if app_data["state"] == "working" else "other_than_working" + repo_state = ( + "working" if app_data["state"] == "working" else "other_than_working" + ) if repo.state != repo_state: repo.state = repo_state @@ -296,26 +323,37 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo_is_updated = True if repo_is_updated: - await broadcast({ - "action": "update_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "update_app", + "data": model_to_dict(repo), + }, + "apps", + ) # new app elif app_id not in repos: - task_logger.info(f"New application detected: {app_id} " + (", scheduling a new job" if monitor_git else "")) + task_logger.info( + f"New application detected: {app_id} " + + (", scheduling a new job" if monitor_git else "") + ) repo = Repo.create( name=app_id, url=app_data["git"]["url"], revision=commit_sha, - state="working" if app_data["state"] == "working" else "other_than_working", + state="working" + if app_data["state"] == "working" + else "other_than_working", random_job_day=random.randint(1, 28), ) - await broadcast({ - "action": "new_app", - "data": model_to_dict(repo), - }, "apps") + await broadcast( + { + "action": "new_app", + "data": model_to_dict(repo), + }, + "apps", + ) if monitor_git: await create_job(app_id, repo.url) @@ -329,29 +367,43 @@ async def monitor_apps_lists(monitor_git=False, monitor_only_good_quality_apps=F repo = repos[repo_name] # delete scheduled jobs first - task_logger.info(f"Application {repo_name} has been removed from the app list, start by removing its scheduled job if there are any...") - for job in Job.select().where(Job.url_or_path == repo.url, Job.state == "scheduled"): + task_logger.info( + f"Application {repo_name} has been removed from the app list, start by removing its scheduled job if there are any..." + ) + for job in Job.select().where( + Job.url_or_path == repo.url, Job.state == "scheduled" + ): await api_stop_job(None, job.id) # not sure this is going to work job_id = job.id - task_logger.info(f"Delete scheduled job {job.name} #{job.id} for application {repo_name} because the application is being deleted.") + task_logger.info( + f"Delete scheduled job {job.name} #{job.id} for application {repo_name} because the application is being deleted." + ) data = model_to_dict(job) job.delete_instance() - await broadcast({ - "action": "delete_job", - "data": data, - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "delete_job", + "data": data, + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) - task_logger.info(f"Delete application {repo_name} because it has been removed from the apps list.") + task_logger.info( + f"Delete application {repo_name} because it has been removed from the apps list." + ) data = model_to_dict(repo) repo.delete_instance() - await broadcast({ - "action": "delete_app", - "data": data, - }, "apps") + await broadcast( + { + "action": "delete_app", + "data": data, + }, + "apps", + ) @once_per_day @@ -359,7 +411,9 @@ async def launch_monthly_job(): today = date.today().day for repo in Repo.select().where(Repo.random_job_day == today): - task_logger.info(f"Launch monthly job for {repo.name} on day {today} of the month ") + task_logger.info( + f"Launch monthly job for {repo.name} on day {today} of the month " + ) await create_job(repo.name, repo.url) @@ -405,7 +459,7 @@ async def jobs_dispatcher(): if workers.count() == 0: return - with db.atomic('IMMEDIATE'): + with db.atomic("IMMEDIATE"): jobs = Job.select().where(Job.state == "scheduled") # no jobs to process, wait @@ -435,18 +489,25 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal await asyncio.sleep(1) - if not os.path.exists(app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id)): + if not os.path.exists( + app.config.PACKAGE_CHECK_LOCK_PER_WORKER.format(worker_id=worker.id) + ): return job.log += f"Lock for worker {worker.id} still exist ... trying to cleanup the old package check still running ...\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) - task_logger.info(f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ...") + task_logger.info( + f"Lock for worker {worker.id} still exist ... trying to cleanup old check process ..." + ) cwd = os.path.split(app.config.PACKAGE_CHECK_PATH)[0] env = { @@ -455,16 +516,19 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, - "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin + "PATH": os.environ["PATH"] + + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } cmd = f"script -qefc '{app.config.PACKAGE_CHECK_PATH} --force-stop 2>&1'" try: - command = await asyncio.create_subprocess_shell(cmd, - cwd=cwd, - env=env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) + command = await asyncio.create_subprocess_shell( + cmd, + cwd=cwd, + env=env, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) while not command.stdout.at_eof(): data = await command.stdout.readline() await asyncio.sleep(1) @@ -495,19 +559,25 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal return True finally: job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) async def run_job(worker, job): - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) await asyncio.sleep(5) @@ -526,24 +596,35 @@ async def run_job(worker, job): "ARCH": app.config.ARCH, "DIST": app.config.DIST, "YNH_BRANCH": app.config.YNH_BRANCH, - "PATH": os.environ["PATH"] + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin + "PATH": os.environ["PATH"] + + ":/usr/local/bin", # This is because lxc/lxd is in /usr/local/bin } now = datetime.now().strftime("%d/%m/%Y - %H:%M:%S") - msg = now + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + msg = ( + now + + f" - Starting test for {job.name} on arch {app.config.ARCH}, distrib {app.config.DIST}, with YunoHost {app.config.YNH_BRANCH}" + ) job.log += "=" * len(msg) + "\n" job.log += msg + "\n" job.log += "=" * len(msg) + "\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) - result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format(worker_id=worker.id) + result_json = app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER.format( + worker_id=worker.id + ) full_log = app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER.format(worker_id=worker.id) - summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format(worker_id=worker.id) + summary_png = app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER.format( + worker_id=worker.id + ) if os.path.exists(result_json): os.remove(result_json) @@ -556,31 +637,35 @@ async def run_job(worker, job): task_logger.info(f"Launching command: {cmd}") try: - command = await asyncio.create_subprocess_shell(cmd, - cwd=cwd, - env=env, - # default limit is not enough in some situations - limit=(2 ** 16) ** 10, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - + command = await asyncio.create_subprocess_shell( + cmd, + cwd=cwd, + env=env, + # default limit is not enough in some situations + limit=(2 ** 16) ** 10, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) while not command.stdout.at_eof(): data = await command.stdout.readline() try: - job.log += data.decode('utf-8', 'replace') + job.log += data.decode("utf-8", "replace") except UnicodeDecodeError as e: job.log += "Uhoh ?! UnicodeDecodeError in yunorunner !?" job.log += str(e) job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() @@ -616,11 +701,23 @@ async def run_job(worker, job): job.log += f"\nThe full log is available at {app.config.BASE_URL}/logs/{job.id}.log\n" shutil.copy(full_log, yunorunner_dir + f"/results/logs/{job.id}.log") - shutil.copy(result_json, yunorunner_dir + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json") - shutil.copy(summary_png, yunorunner_dir + f"/results/summary/{job.id}.png") + shutil.copy( + result_json, + yunorunner_dir + + f"/results/logs/{job_app}_{app.config.ARCH}_{app.config.YNH_BRANCH}_results.json", + ) + shutil.copy( + summary_png, yunorunner_dir + f"/results/summary/{job.id}.png" + ) - public_result_json_path = yunorunner_dir + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" - if os.path.exists(public_result_json_path) or not open(public_result_json_path).read().strip(): + public_result_json_path = ( + yunorunner_dir + + f"/results/logs/list_level_{app.config.YNH_BRANCH}_{app.config.ARCH}.json" + ) + if ( + os.path.exists(public_result_json_path) + or not open(public_result_json_path).read().strip() + ): public_result = {} else: public_result = json.load(open(public_result_json_path)) @@ -637,12 +734,14 @@ async def run_job(worker, job): job.log += "=" * len(msg) + "\n" job.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) if "ci-apps.yunohost.org" in app.config.BASE_URL: try: @@ -665,7 +764,9 @@ async def run_job(worker, job): elif level < public_level: msg = f"App {job_app} goes down from level {public_level} to {level} in job {job_id_with_url}" elif level < 6: - msg = f"App {job_app} stays at level {level} in job {job_id_with_url}" + msg = ( + f"App {job_app} stays at level {level} in job {job_id_with_url}" + ) else: # Dont notify anything, reduce CI flood on app chatroom if app is already level 6+ msg = "" @@ -686,7 +787,7 @@ async def run_job(worker, job): job.log += "Exception:\n" job.log += traceback.format_exc() - #if job.state != "canceled": + # if job.state != "canceled": # await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) # remove ourself from the state @@ -695,11 +796,14 @@ async def run_job(worker, job): worker.state = "available" worker.save() - await broadcast({ - "action": "update_job", - "id": job.id, - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "id": job.id, + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) async def broadcast(message, channels): @@ -722,6 +826,7 @@ async def broadcast(message, channels): except ValueError: pass + def subscribe(ws, channel): subscriptions[channel].append(ws) @@ -765,7 +870,7 @@ def chunks(l, n): yield chunk -@app.websocket('/index-ws') +@app.websocket("/index-ws") @clean_websocket async def ws_index(request, websocket): subscribe(websocket, "jobs") @@ -782,46 +887,67 @@ async def ws_index(request, websocket): ) JobAlias = Job.alias() - subquery = JobAlias.select(*selected_fields)\ - .where(JobAlias.state << ("done", "failure", "canceled", "error"))\ - .group_by(JobAlias.url_or_path)\ - .select(fn.Max(JobAlias.id).alias("max_id")) + subquery = ( + JobAlias.select(*selected_fields) + .where(JobAlias.state << ("done", "failure", "canceled", "error")) + .group_by(JobAlias.url_or_path) + .select(fn.Max(JobAlias.id).alias("max_id")) + ) - latest_done_jobs = Job.select(*selected_fields)\ - .join(subquery, on=(Job.id == subquery.c.max_id))\ - .order_by(-Job.id) + latest_done_jobs = ( + Job.select(*selected_fields) + .join(subquery, on=(Job.id == subquery.c.max_id)) + .order_by(-Job.id) + ) - subquery = JobAlias.select(*selected_fields)\ - .where(JobAlias.state == "scheduled")\ - .group_by(JobAlias.url_or_path)\ - .select(fn.Min(JobAlias.id).alias("min_id")) + subquery = ( + JobAlias.select(*selected_fields) + .where(JobAlias.state == "scheduled") + .group_by(JobAlias.url_or_path) + .select(fn.Min(JobAlias.id).alias("min_id")) + ) - next_scheduled_jobs = Job.select(*selected_fields)\ - .join(subquery, on=(Job.id == subquery.c.min_id))\ - .order_by(-Job.id) + next_scheduled_jobs = ( + Job.select(*selected_fields) + .join(subquery, on=(Job.id == subquery.c.min_id)) + .order_by(-Job.id) + ) # chunks initial data by batch of 30 to avoid killing firefox - data = chunks(itertools.chain(map(model_to_dict, next_scheduled_jobs.iterator()), - map(model_to_dict, Job.select().where(Job.state == "running").iterator()), - map(model_to_dict, latest_done_jobs.iterator())), 30) + data = chunks( + itertools.chain( + map(model_to_dict, next_scheduled_jobs.iterator()), + map(model_to_dict, Job.select().where(Job.state == "running").iterator()), + map(model_to_dict, latest_done_jobs.iterator()), + ), + 30, + ) first_chunck = next(data) - await websocket.send(my_json_dumps({ - "action": "init_jobs", - "data": first_chunck, # send first chunk - })) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs", + "data": first_chunck, # send first chunk + } + ) + ) for chunk in data: - await websocket.send(my_json_dumps({ - "action": "init_jobs_stream", - "data": chunk, - })) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs_stream", + "data": chunk, + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/job-ws/') +@app.websocket("/job-ws/") @clean_websocket async def ws_job(request, websocket, job_id): job = Job.select().where(Job.id == job_id) @@ -833,15 +959,19 @@ async def ws_job(request, websocket, job_id): subscribe(websocket, f"job-{job.id}") - await websocket.send(my_json_dumps({ - "action": "init_job", - "data": model_to_dict(job), - })) + await websocket.send( + my_json_dumps( + { + "action": "init_job", + "data": model_to_dict(job), + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/apps-ws') +@app.websocket("/apps-ws") @clean_websocket async def ws_apps(request, websocket): subscribe(websocket, "jobs") @@ -849,7 +979,8 @@ async def ws_apps(request, websocket): # I need to do this because peewee strangely fuck up on join and remove the # subquery fields which breaks everything - repos = Repo.raw(''' + repos = Repo.raw( + """ SELECT "id", "name", @@ -894,7 +1025,8 @@ async def ws_apps(request, websocket): ("t5"."url_or_path" = "t1"."url") ORDER BY "name" - ''') + """ + ) repos = [ { @@ -907,41 +1039,58 @@ async def ws_apps(request, websocket): "job_id": x.job_id, "job_name": x.job_name, "job_state": x.job_state, - "created_time": datetime.strptime(x.created_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.created_time else None, - "started_time": datetime.strptime(x.started_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.started_time else None, - "end_time": datetime.strptime(x.end_time.split(".")[0], '%Y-%m-%d %H:%M:%S') if x.end_time else None, - } for x in repos + "created_time": datetime.strptime( + x.created_time.split(".")[0], "%Y-%m-%d %H:%M:%S" + ) + if x.created_time + else None, + "started_time": datetime.strptime( + x.started_time.split(".")[0], "%Y-%m-%d %H:%M:%S" + ) + if x.started_time + else None, + "end_time": datetime.strptime(x.end_time.split(".")[0], "%Y-%m-%d %H:%M:%S") + if x.end_time + else None, + } + for x in repos ] # add apps without jobs selected_repos = {x["id"] for x in repos} for repo in Repo.select().where(Repo.id.not_in(selected_repos)): - repos.append({ - "id": repo.id, - "name": repo.name, - "url": repo.url, - "revision": repo.revision, - "state": repo.state, - "random_job_day": repo.random_job_day, - "job_id": None, - "job_name": None, - "job_state": None, - "created_time": None, - "started_time": None, - "end_time": None, - }) + repos.append( + { + "id": repo.id, + "name": repo.name, + "url": repo.url, + "revision": repo.revision, + "state": repo.state, + "random_job_day": repo.random_job_day, + "job_id": None, + "job_name": None, + "job_state": None, + "created_time": None, + "started_time": None, + "end_time": None, + } + ) repos = sorted(repos, key=lambda x: x["name"]) - await websocket.send(my_json_dumps({ - "action": "init_apps", - "data": repos, - })) + await websocket.send( + my_json_dumps( + { + "action": "init_apps", + "data": repos, + } + ) + ) await websocket.wait_for_connection_lost() -@app.websocket('/app-ws/') +@app.websocket("/app-ws/") @clean_websocket async def ws_app(request, websocket, app_name): # XXX I don't check if the app exists because this websocket is supposed to @@ -950,11 +1099,21 @@ async def ws_app(request, websocket, app_name): subscribe(websocket, f"app-jobs-{app.url}") - job = list(Job.select().where(Job.url_or_path == app.url).order_by(-Job.id).limit(10).dicts()) - await websocket.send(my_json_dumps({ - "action": "init_jobs", - "data": job, - })) + job = list( + Job.select() + .where(Job.url_or_path == app.url) + .order_by(-Job.id) + .limit(10) + .dicts() + ) + await websocket.send( + my_json_dumps( + { + "action": "init_jobs", + "data": job, + } + ) + ) await websocket.wait_for_connection_lost() @@ -966,23 +1125,32 @@ def require_token(): # run some method that checks the request # for the client's authorization status if "X-Token" not in request.headers: - return response.json({'status': 'you need to provide a token ' - 'to access the API, please ' - 'refer to the README'}, 403) + return response.json( + { + "status": "you need to provide a token " + "to access the API, please " + "refer to the README" + }, + 403, + ) token = request.headers["X-Token"].strip() if not hmac.compare_digest(token, admin_token): - api_logger.warning("someone tried to access the API using an invalid admin token") - return response.json({'status': 'invalid token'}, 403) + api_logger.warning( + "someone tried to access the API using an invalid admin token" + ) + return response.json({"status": "invalid token"}, 403) result = await f(request, *args, **kwargs) return result + return decorated_function + return decorator -@app.route("/api/job", methods=['POST']) +@app.route("/api/job", methods=["POST"]) @require_token() async def api_new_job(request): job = Job.create( @@ -993,26 +1161,29 @@ async def api_new_job(request): api_logger.info(f"Request to add new job '{job.name}' [{job.id}]") - await broadcast({ - "action": "new_job", - "data": model_to_dict(job), - }, ["jobs", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "new_job", + "data": model_to_dict(job), + }, + ["jobs", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") -@app.route("/api/job", methods=['GET']) +@app.route("/api/job", methods=["GET"]) @require_token() async def api_list_job(request): query = Job.select() if not all: - query.where(Job.state in ('scheduled', 'running')) + query.where(Job.state in ("scheduled", "running")) return response.json([model_to_dict(x) for x in query.order_by(-Job.id)]) -@app.route("/api/app", methods=['GET']) +@app.route("/api/app", methods=["GET"]) @require_token() async def api_list_app(request): query = Repo.select() @@ -1020,7 +1191,7 @@ async def api_list_app(request): return response.json([model_to_dict(x) for x in query.order_by(Repo.name)]) -@app.route("/api/job/", methods=['DELETE']) +@app.route("/api/job/", methods=["DELETE"]) @require_token() async def api_delete_job(request, job_id): api_logger.info(f"Request to restart job {job_id}") @@ -1035,10 +1206,13 @@ async def api_delete_job(request, job_id): data = model_to_dict(job) job.delete_instance() - await broadcast({ - "action": "delete_job", - "data": data, - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "delete_job", + "data": data, + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") @@ -1054,15 +1228,17 @@ async def stop_job(job_id): api_logger.info(f"Request to stop job '{job.name}' [{job.id}]") if job.state == "scheduled": - api_logger.info(f"Cancel scheduled job '{job.name}' [job.id] " - f"on request") + api_logger.info(f"Cancel scheduled job '{job.name}' [job.id] " f"on request") job.state = "canceled" job.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") @@ -1075,36 +1251,42 @@ async def stop_job(job_id): jobs_in_memory_state[job.id]["task"].cancel() - worker = Worker.select().where(Worker.id == jobs_in_memory_state[job.id]["worker"])[0] + worker = Worker.select().where( + Worker.id == jobs_in_memory_state[job.id]["worker"] + )[0] worker.state = "available" worker.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): - api_logger.info(f"Request to cancel job '{job.name}' " - f"[job.id] but job is already in '{job.state}' state, " - f"do nothing") + api_logger.info( + f"Request to cancel job '{job.name}' " + f"[job.id] but job is already in '{job.state}' state, " + f"do nothing" + ) # nothing to do, task is already done return response.text("ok") - raise Exception(f"Tryed to cancel a job with an unknown state: " - f"{job.state}") + raise Exception(f"Tryed to cancel a job with an unknown state: " f"{job.state}") -@app.route("/api/job//stop", methods=['POST']) +@app.route("/api/job//stop", methods=["POST"]) async def api_stop_job(request, job_id): # TODO auth or some kind return await stop_job(job_id) -@app.route("/api/job//restart", methods=['POST']) +@app.route("/api/job//restart", methods=["POST"]) async def api_restart_job(request, job_id): api_logger.info(f"Request to restart job {job_id}") # Calling a route (eg api_stop_job) doesn't work anymore @@ -1116,16 +1298,19 @@ async def api_restart_job(request, job_id): job.log = "" job.save() - await broadcast({ - "action": "update_job", - "data": model_to_dict(job), - }, ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"]) + await broadcast( + { + "action": "update_job", + "data": model_to_dict(job), + }, + ["jobs", f"job-{job_id}", f"app-jobs-{job.url_or_path}"], + ) return response.text("ok") # Meant to interface with https://shields.io/endpoint -@app.route("/api/job//badge", methods=['GET']) +@app.route("/api/job//badge", methods=["GET"]) async def api_badge_job(request, job_id): job = Job.select().where(Job.id == job_id) @@ -1136,24 +1321,26 @@ async def api_badge_job(request, job_id): job = job[0] state_to_color = { - 'scheduled': 'lightgrey', - 'running': 'blue', - 'done': 'brightgreen', - 'failure': 'red', - 'error': 'red', - 'canceled': 'yellow', + "scheduled": "lightgrey", + "running": "blue", + "done": "brightgreen", + "failure": "red", + "error": "red", + "canceled": "yellow", } - return response.json({ - "schemaVersion": 1, - "label": "tests", - "message": job.state, - "color": state_to_color[job.state] - }) + return response.json( + { + "schemaVersion": 1, + "label": "tests", + "message": job.state, + "color": state_to_color[job.state], + } + ) -@app.route('/job/') -@jinja.template('job.html') +@app.route("/job/") +@jinja.template("job.html") async def html_job(request, job_id): job = Job.select().where(Job.id == job_id) @@ -1172,41 +1359,45 @@ async def html_job(request, job_id): return { "job": job, - 'app': application, - 'job_url': job_url, - 'badge_url': badge_url, - 'shield_badge_url': shield_badge_url, - 'summary_url': summary_url, - 'relative_path_to_root': '../', - 'path': request.path + "app": application, + "job_url": job_url, + "badge_url": badge_url, + "shield_badge_url": shield_badge_url, + "summary_url": summary_url, + "relative_path_to_root": "../", + "path": request.path, } -@app.route('/apps/', strict_slashes=True) # To avoid reaching the route "/apps//" with an empty string -@jinja.template('apps.html') +@app.route( + "/apps/", strict_slashes=True +) # To avoid reaching the route "/apps//" with an empty string +@jinja.template("apps.html") async def html_apps(request): - return {'relative_path_to_root': '../', 'path': request.path} + return {"relative_path_to_root": "../", "path": request.path} -@app.route('/apps//') -@jinja.template('app.html') +@app.route("/apps//") +@jinja.template("app.html") async def html_app(request, app_name): app = Repo.select().where(Repo.name == app_name) if app.count() == 0: raise NotFound() - return {"app": app[0], 'relative_path_to_root': '../../', 'path': request.path} + return {"app": app[0], "relative_path_to_root": "../../", "path": request.path} -@app.route('/apps//latestjob') +@app.route("/apps//latestjob") async def html_app_latestjob(request, app_name): _app = Repo.select().where(Repo.name == app_name) if _app.count() == 0: raise NotFound() - jobs = Job.select(fn.MAX(Job.id)).where(Job.url_or_path == _app[0].url, Job.state != 'scheduled') + jobs = Job.select(fn.MAX(Job.id)).where( + Job.url_or_path == _app[0].url, Job.state != "scheduled" + ) if jobs.count() == 0: jobs = Job.select(fn.MAX(Job.id)).where(Job.url_or_path == _app[0].url) @@ -1219,19 +1410,19 @@ async def html_app_latestjob(request, app_name): return response.redirect(job_url) -@app.route('/') -@jinja.template('index.html') +@app.route("/") +@jinja.template("index.html") async def html_index(request): - return {'relative_path_to_root': '', 'path': request.path} + return {"relative_path_to_root": "", "path": request.path} -#@always_relaunch(sleep=10) -#async def number_of_tasks(): +# @always_relaunch(sleep=10) +# async def number_of_tasks(): # print("Number of tasks: %s" % len(asyncio_all_tasks())) # # -#@app.route('/monitor') -#async def monitor(request): +# @app.route('/monitor') +# async def monitor(request): # snapshot = tracemalloc.take_snapshot() # top_stats = snapshot.statistics('lineno') # @@ -1259,28 +1450,36 @@ async def github(request): # Abort directly if no secret opened # (which also allows to only enable this feature if # we define the webhook secret) - if not os.path.exists("./github_webhook_secret") or not os.path.exists("./github_bot_token"): - api_logger.info(f"Received a webhook but no ./github_webhook_secret or ./github_bot_token file exists ... ignoring") - return response.json({'error': 'GitHub hooks not configured'}, 403) + if not os.path.exists("./github_webhook_secret") or not os.path.exists( + "./github_bot_token" + ): + api_logger.info( + f"Received a webhook but no ./github_webhook_secret or ./github_bot_token file exists ... ignoring" + ) + return response.json({"error": "GitHub hooks not configured"}, 403) # Only SHA1 is supported header_signature = request.headers.get("X-Hub-Signature") if header_signature is None: api_logger.info("Received a webhook but there's no header X-Hub-Signature") - return response.json({'error': 'No X-Hub-Signature'}, 403) + return response.json({"error": "No X-Hub-Signature"}, 403) sha_name, signature = header_signature.split("=") if sha_name != "sha1": - api_logger.info("Received a webhook but signing algo isn't sha1, it's '%s'" % sha_name) - return response.json({'error': "Signing algorightm is not sha1 ?!"}, 501) + api_logger.info( + "Received a webhook but signing algo isn't sha1, it's '%s'" % sha_name + ) + return response.json({"error": "Signing algorightm is not sha1 ?!"}, 501) secret = open("./github_webhook_secret", "r").read().strip() # HMAC requires the key to be bytes, but data is string mac = hmac.new(secret.encode(), msg=request.body, digestmod=hashlib.sha1) if not hmac.compare_digest(str(mac.hexdigest()), str(signature)): - api_logger.info(f"Received a webhook but signature authentication failed (is the secret properly configured?)") - return response.json({'error': "Bad signature ?!"}, 403) + api_logger.info( + f"Received a webhook but signature authentication failed (is the secret properly configured?)" + ) + return response.json({"error": "Bad signature ?!"}, 403) hook_type = request.headers.get("X-Github-Event") hook_infos = request.json @@ -1289,17 +1488,19 @@ async def github(request): # - *New* comments # - On issue/PRs which are still open if hook_type == "issue_comment": - if hook_infos["action"] != "created" \ - or hook_infos["issue"]["state"] != "open" \ - or "pull_request" not in hook_infos["issue"]: - # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + if ( + hook_infos["action"] != "created" + or hook_infos["issue"]["state"] != "open" + or "pull_request" not in hook_infos["issue"] + ): + # Nothing to do but success anyway (204 = No content) + return response.json({"msg": "Nothing to do"}, 204) # Check the comment contains proper keyword trigger body = hook_infos["comment"]["body"].strip()[:100].lower() if not any(trigger.lower() in body for trigger in app.config.WEBHOOK_TRIGGERS): # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) # We only accept this from people which are member of the org # https://docs.github.com/en/rest/reference/orgs#check-organization-membership-for-a-user @@ -1307,34 +1508,49 @@ async def github(request): # which is not represented in the original webhook async def is_user_in_organization(user): token = open("./github_bot_token").read().strip() - async with aiohttp.ClientSession(headers={"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"}) as session: - resp = await session.get(f"https://api.github.com/orgs/YunoHost-Apps/members/{user}") + async with aiohttp.ClientSession( + headers={ + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + } + ) as session: + resp = await session.get( + f"https://api.github.com/orgs/YunoHost-Apps/members/{user}" + ) return resp.status == 204 if not await is_user_in_organization(hook_infos["comment"]["user"]["login"]): # Unauthorized - return response.json({'error': "Unauthorized"}, 403) + return response.json({"error": "Unauthorized"}, 403) # Fetch the PR infos (yeah they ain't in the initial infos we get @_@) pr_infos_url = hook_infos["issue"]["pull_request"]["url"] elif hook_type == "pull_request": if hook_infos["action"] != "opened": # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) # We only accept PRs that are created by github-action bot - if hook_infos["pull_request"]["user"]["login"] != "github-actions[bot]" \ - or not hook_infos["pull_request"]["head"]["ref"].startswith("ci-auto-update-"): + if hook_infos["pull_request"]["user"][ + "login" + ] != "github-actions[bot]" or not hook_infos["pull_request"]["head"][ + "ref" + ].startswith( + "ci-auto-update-" + ): # Unauthorized - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) if not app.config.ANSWER_TO_AUTO_UPDATER: # Unauthorized - return response.json({'msg': "Nothing to do, I am configured to ignore the auto-updater"}, 204) + return response.json( + {"msg": "Nothing to do, I am configured to ignore the auto-updater"}, + 204, + ) # Fetch the PR infos (yeah they ain't in the initial infos we get @_@) pr_infos_url = hook_infos["pull_request"]["url"] else: # Nothing to do but success anyway (204 = No content) - return response.json({'msg': "Nothing to do"}, 204) + return response.json({"msg": "Nothing to do"}, 204) async with aiohttp.ClientSession() as session: async with session.get(pr_infos_url) as resp: @@ -1345,17 +1561,21 @@ async def github(request): url_to_test = f"{repo}/tree/{branch_name}" app_id = pr_infos["base"]["repo"]["name"].rstrip("") if app_id.endswith("_ynh"): - app_id = app_id[:-len("_ynh")] + app_id = app_id[: -len("_ynh")] pr_id = str(pr_infos["number"]) # Create the job for the corresponding app (with the branch url) api_logger.info("Scheduling a new job from comment on a PR") - job = await create_job(app_id, url_to_test, job_comment=f"PR #{pr_id}, {branch_name}") + job = await create_job( + app_id, url_to_test, job_comment=f"PR #{pr_id}, {branch_name}" + ) if not job: - return response.json({'msg': "Nothing to do, corresponding job already scheduled"}, 204) + return response.json( + {"msg": "Nothing to do, corresponding job already scheduled"}, 204 + ) # Answer with comment with link+badge for the job @@ -1366,8 +1586,12 @@ async def github(request): comments_url = hook_infos["pull_request"]["comments_url"] token = open("./github_bot_token").read().strip() - async with aiohttp.ClientSession(headers={"Authorization": f"token {token}"}) as session: - async with session.post(comments_url, data=my_json_dumps({"body": body})) as resp: + async with aiohttp.ClientSession( + headers={"Authorization": f"token {token}"} + ) as session: + async with session.post( + comments_url, data=my_json_dumps({"body": body}) + ) as resp: respjson = await resp.json() api_logger.info("Added comment %s" % respjson["html_url"]) @@ -1385,7 +1609,7 @@ async def github(request): return response.text("ok") -#def show_coro(c): +# def show_coro(c): # data = { # 'txt': str(c), # 'type': str(type(c)), @@ -1405,7 +1629,7 @@ async def github(request): # return data -#def format_frame(f): +# def format_frame(f): # keys = ['f_code', 'f_lineno'] # return dict([(k, str(getattr(f, k))) for k in keys]) @@ -1457,33 +1681,61 @@ def main(config="./config.py"): "ARCH": "amd64", "DIST": "bullseye", "PACKAGE_CHECK_DIR": yunorunner_dir + "/package_check/", - "WEBHOOK_TRIGGERS": ["!testme", "!gogogadgetoci", "By the power of systemd, I invoke The Great App CI to test this Pull Request!"], - "WEBHOOK_CATCHPHRASES": ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:"], + "WEBHOOK_TRIGGERS": [ + "!testme", + "!gogogadgetoci", + "By the power of systemd, I invoke The Great App CI to test this Pull Request!", + ], + "WEBHOOK_CATCHPHRASES": [ + "Alrighty!", + "Fingers crossed!", + "May the CI gods be with you!", + ":carousel_horse:", + ":rocket:", + ":sunflower:", + "Meow :cat2:", + ":v:", + ":stuck_out_tongue_winking_eye:", + ], } app.config.update_config(default_config) app.config.update_config(config) app.config.PACKAGE_CHECK_PATH = app.config.PACKAGE_CHECK_DIR + "package_check.sh" - app.config.PACKAGE_CHECK_LOCK_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "pcheck-{worker_id}.lock" - app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "full_log_{worker_id}.log" - app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "results_{worker_id}.json" - app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER = app.config.PACKAGE_CHECK_DIR + "summary_{worker_id}.png" + app.config.PACKAGE_CHECK_LOCK_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "pcheck-{worker_id}.lock" + ) + app.config.PACKAGE_CHECK_FULL_LOG_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "full_log_{worker_id}.log" + ) + app.config.PACKAGE_CHECK_RESULT_JSON_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "results_{worker_id}.json" + ) + app.config.PACKAGE_CHECK_SUMMARY_PNG_PER_WORKER = ( + app.config.PACKAGE_CHECK_DIR + "summary_{worker_id}.png" + ) if not os.path.exists(app.config.PACKAGE_CHECK_PATH): - print(f"Error: analyzer script doesn't exist at '{app.config.PACKAGE_CHECK_PATH}'. Please fix the configuration in {config}") + print( + f"Error: analyzer script doesn't exist at '{app.config.PACKAGE_CHECK_PATH}'. Please fix the configuration in {config}" + ) sys.exit(1) if app.config.MONITOR_APPS_LIST: - app.add_task(monitor_apps_lists(monitor_git=app.config.MONITOR_GIT, - monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS)) + app.add_task( + monitor_apps_lists( + monitor_git=app.config.MONITOR_GIT, + monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS, + ) + ) if app.config.MONTHLY_JOBS: app.add_task(launch_monthly_job()) app.add_task(jobs_dispatcher()) - #app.add_task(number_of_tasks()) - app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG) + # app.add_task(number_of_tasks()) + app.run("localhost", port=app.config.PORT, debug=app.config.DEBUG) if __name__ == "__main__":