diff --git a/run.py b/run.py index 22de9d1..10ed6eb 100644 --- a/run.py +++ b/run.py @@ -3,6 +3,7 @@ import os import sys import ujson +import logging import asyncio import random @@ -17,11 +18,31 @@ from websockets.exceptions import ConnectionClosed from sanic import Sanic, response from sanic.exceptions import NotFound +from sanic.log import LOGGING_CONFIG_DEFAULTS from playhouse.shortcuts import model_to_dict, dict_to_model from models import Repo, Job, db, Worker +LOGGING_CONFIG_DEFAULTS["loggers"]["task"] = { + "level": "INFO", + "handlers": ["task_console"], +} + +LOGGING_CONFIG_DEFAULTS["handlers"]["task_console"] = { + "class": "logging.StreamHandler", + "formatter": "background", + "stream": sys.stdout, +} + +LOGGING_CONFIG_DEFAULTS["formatters"]["background"] = { + "format": "%(asctime)s [%(process)d] [BACKGROUND] [%(funcName)s] %(message)s", + "datefmt": "[%Y-%m-%d %H:%M:%S %z]", + "class": "logging.Formatter", +} + +task_logger = logging.getLogger("task") + app = Sanic() APPS_LISTS = { @@ -68,23 +89,21 @@ async def monitor_apps_lists(): for app_list_name, url in APPS_LISTS.items(): async with aiohttp.ClientSession() as session: app_list = "official" - sys.stdout.write(f"Downloading {app_list_name}.json...\r") - sys.stdout.flush() + task_logger.info(f"Downloading {app_list_name}.json...") async with session.get(url) as resp: data = await resp.json() - sys.stdout.write(f"Downloading {app_list_name}.json...done\n") repos = {x.name: x for x in Repo.select().where(Repo.app_list == app_list_name)} for app_id, app_data in data.items(): commit_sha = await get_master_commit_sha(app_id) - print(f"{app_id} → {commit_sha}") # already know, look to see if there is new commits if app_id in repos: repo = repos[app_id] if repo.revision != commit_sha: - print(f"Application {app_id} has new commits on github, schedule new job") + task_logger.info(f"Application {app_id} has new commits on github" + f"({repo.revision} → {commit_sha}), schedule new job") repo.revision = commit_sha repo.save() @@ -101,9 +120,12 @@ async def monitor_apps_lists(): "data": model_to_dict(job), }, "jobs") + else: + task_logger.info(f"Application {app_id} in {app_list_name} is still at the same revision {commit_sha}, do nothing") + # new app else: - print(f"New application detected: {app_id} in {app_list_name}") + task_logger.info(f"New application detected: {app_id} in {app_list_name}, scheduling a new job") repo = Repo.create( name=app_id, url=app_data["git"]["url"], @@ -111,7 +133,6 @@ async def monitor_apps_lists(): app_list=app_list_name, ) - print(f"Schedule a new build for {app_id}") job = Job.create( name=f"{app_id} ({app_list_name})", url_or_path=repo.url, @@ -176,11 +197,11 @@ async def run_job(worker, job): }, ["jobs", f"job-{job.id}"]) # fake stupid command, whould run CI instead - print(f"Starting job {job.name}...") + task_logger.info(f"Starting job '{job.name}'...") cwd = os.path.split(path_to_analyseCI)[0] arguments = f' {job.url_or_path} "{job.name}"' - print("/bin/bash " + path_to_analyseCI + arguments) + task_logger.debug(f"Launch command: /bin/bash " + path_to_analyseCI + arguments) command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments, cwd=cwd, stdout=asyncio.subprocess.PIPE, @@ -205,7 +226,7 @@ async def run_job(worker, job): # XXX stupid crap to stimulate long jobs await asyncio.sleep(random.randint(1, 15)) # await asyncio.sleep(5) - print(f"Finished job {job.name}") + task_logger.info(f"Finished job '{job.name}'") await command.wait() job.end_time = datetime.now() @@ -257,9 +278,8 @@ async def index_ws(request, websocket): })) while True: - data = await websocket.recv() - print(f"websocket: {data}") - await websocket.send(f"echo {data}") + # do nothing with input but wait + await websocket.recv() @app.websocket('/job--ws') @@ -279,9 +299,8 @@ async def job_ws(request, websocket, job_id): })) while True: - data = await websocket.recv() - print(f"websocket: {data}") - await websocket.send(f"echo {data}") + # do nothing with input but wait + await websocket.recv() @app.route("/api/job", methods=['POST']) @@ -304,7 +323,7 @@ async def api_new_job(request): debian_version=request.json.get("debian_version", "stretch"), ) - print(f"Request to add new job '{job.name}' {job}") + task_logger.info(f"Request to add new job '{job.name}' [{job.id}]") await broadcast({ "action": "new_job", @@ -326,6 +345,7 @@ async def api_stop_job(request, job_id): job = job[0] if job.state == "scheduled": + task_logger.info(f"Cancel scheduled job '{job.name}' [job.id] on request") job.state = "canceled" job.save() @@ -337,6 +357,7 @@ async def api_stop_job(request, job_id): return response.text("ok") if job.state == "running": + task_logger.info(f"Cancel running job '{job.name}' [job.id] on request") job.state = "canceled" job.save() @@ -354,6 +375,7 @@ async def api_stop_job(request, job_id): return response.text("ok") if job.state in ("done", "canceled", "failure"): + task_logger.info(f"Request to cancel job '{job.name}' [job.id] but job is already in '{job.state}' state, do nothing") # nothing to do, task is already done return response.text("ok")