diff --git a/run.py b/run.py index a76a4a9..140f944 100644 --- a/run.py +++ b/run.py @@ -82,10 +82,20 @@ LOGGING_CONFIG_DEFAULTS["formatters"] = { }, } + +def datetime_to_epoch_json_converter(o): + if isinstance(o, datetime): + return o.strftime('%s') + +# define a custom json dumps to convert datetime +def my_json_dumps(o): + return json.dumps(o, default=datetime_to_epoch_json_converter) + + task_logger = logging.getLogger("task") api_logger = logging.getLogger("api") -app = Sanic(__name__) +app = Sanic(__name__, dumps=my_json_dumps) app.static('/static', './static/') loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)) + '/templates', encoding='utf8') @@ -110,11 +120,6 @@ subscriptions = defaultdict(list) jobs_in_memory_state = {} -def datetime_to_epoch_json_converter(o): - if isinstance(o, datetime): - return o.strftime('%s') - - async def wait_closed(self): """ Wait until the connection is closed. @@ -351,11 +356,41 @@ async def launch_monthly_job(): await create_job(repo.name, repo.url) +async def ensure_workers_count(): + if Worker.select().count() < app.config.WORKER_COUNT: + for _ in range(app.config.WORKER_COUNT - Worker.select().count()): + Worker.create(state="available") + elif Worker.select().count() > app.config.WORKER_COUNT: + workers_to_remove = Worker.select().count() - app.config.WORKER_COUNT + workers = Worker.select().where(Worker.state == "available") + for worker in workers: + if workers_to_remove == 0: + break + worker.delete_instance() + workers_to_remove -= 1 + + jobs_to_stop = workers_to_remove + for job_id in jobs_in_memory_state: + if jobs_to_stop == 0: + break + await stop_job(job_id) + jobs_to_stop -= 1 + job = Job.select().where(Job.id == job_id)[0] + job.state = "scheduled" + job.log = "" + job.save() + + workers = Worker.select().where(Worker.state == "available") + for worker in workers: + if workers_to_remove == 0: + break + worker.delete_instance() + workers_to_remove -= 1 + + @always_relaunch(sleep=3) async def jobs_dispatcher(): - if Worker.select().count() == 0: - for i in range(1): - Worker.create(state="available") + await ensure_workers_count() workers = Worker.select().where(Worker.state == "available") @@ -400,7 +435,7 @@ async def run_job(worker, job): task_logger.info(f"Starting job '{job.name}' #{job.id}...") cwd = os.path.split(path_to_analyseCI)[0] - arguments = f' {job.url_or_path} "{job.name}" {job.id}' + arguments = f' {job.url_or_path} "{job.name}" {job.id} {worker.id}' task_logger.info(f"Launch command: /bin/bash " + path_to_analyseCI + arguments) try: command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments, @@ -483,12 +518,15 @@ async def broadcast(message, channels): for ws in ws_list: try: - await ws.send(json.dumps(message, default=datetime_to_epoch_json_converter)) + await ws.send(my_json_dumps(message)) except ConnectionClosed: dead_ws.append(ws) for to_remove in dead_ws: - ws_list.remove(to_remove) + try: + ws_list.remove(to_remove) + except ValueError: + pass def subscribe(ws, channel): subscriptions[channel].append(ws) @@ -575,16 +613,16 @@ async def ws_index(request, websocket): first_chunck = next(data) - await websocket.send(json.dumps({ + await websocket.send(my_json_dumps({ "action": "init_jobs", "data": first_chunck, # send first chunk - }, default=datetime_to_epoch_json_converter)) + })) for chunk in data: - await websocket.send(json.dumps({ + await websocket.send(my_json_dumps({ "action": "init_jobs_stream", "data": chunk, - }, default=datetime_to_epoch_json_converter)) + })) await websocket.wait_closed() @@ -601,10 +639,10 @@ async def ws_job(request, websocket, job_id): subscribe(websocket, f"job-{job.id}") - await websocket.send(json.dumps({ + await websocket.send(my_json_dumps({ "action": "init_job", "data": model_to_dict(job), - }, default=datetime_to_epoch_json_converter)) + })) await websocket.wait_closed() @@ -701,10 +739,10 @@ async def ws_apps(request, websocket): repos = sorted(repos, key=lambda x: x["name"]) - await websocket.send(json.dumps({ + await websocket.send(my_json_dumps({ "action": "init_apps", "data": repos, - }, default=datetime_to_epoch_json_converter)) + })) await websocket.wait_closed() @@ -719,10 +757,10 @@ async def ws_app(request, websocket, app_name): subscribe(websocket, f"app-jobs-{app.url}") job = list(Job.select().where(Job.url_or_path == app.url).order_by(-Job.id).dicts()) - await websocket.send(json.dumps({ + await websocket.send(my_json_dumps({ "action": "init_jobs", "data": job, - }, default=datetime_to_epoch_json_converter)) + })) await websocket.wait_closed() @@ -1122,7 +1160,7 @@ async def github(request): token = open("./github_bot_token").read().strip() async with aiohttp.ClientSession(headers={"Authorization": f"token {token}"}) as session: - async with session.post(comments_url, data=json.dumps({"body": body}, default=datetime_to_epoch_json_converter)) as resp: + async with session.post(comments_url, data=my_json_dumps({"body": body})) as resp: api_logger.info("Added comment %s" % resp.json()["html_url"]) catchphrases = ["Alrighty!", "Fingers crossed!", "May the CI gods be with you!", ":carousel_horse:", ":rocket:", ":sunflower:", "Meow :cat2:", ":v:", ":stuck_out_tongue_winking_eye:" ] @@ -1164,6 +1202,37 @@ def format_frame(f): return dict([(k, str(getattr(f, k))) for k in keys]) +@app.listener("before_server_start") +async def listener_before_server_start(*args, **kwargs): + task_logger.info("before_server_start") + reset_pending_jobs() + reset_busy_workers() + merge_jobs_on_startup() + + set_random_day_for_monthy_job() + + +@app.listener("after_server_start") +async def listener_after_server_start(*args, **kwargs): + task_logger.info("after_server_start") + + +@app.listener("before_server_stop") +async def listener_before_server_stop(*args, **kwargs): + task_logger.info("before_server_stop") + + +@app.listener("after_server_stop") +async def listener_after_server_stop(*args, **kwargs): + task_logger.info("after_server_stop") + for job_id in jobs_in_memory_state: + await stop_job(job_id) + job = Job.select().where(Job.id == job_id)[0] + job.state = "scheduled" + job.log = "" + job.save() + + def main(config="./config.py"): default_config = { @@ -1175,6 +1244,7 @@ def main(config="./config.py"): "MONITOR_GIT": False, "MONITOR_ONLY_GOOD_QUALITY_APPS": False, "MONTHLY_JOBS": False, + "WORKER_COUNT": 1, } app.config.update_config(default_config) @@ -1184,12 +1254,6 @@ def main(config="./config.py"): print(f"Error: analyzer script doesn't exist at '{app.config.PATH_TO_ANALYZER}'. Please fix the configuration in {config}") sys.exit(1) - reset_pending_jobs() - reset_busy_workers() - merge_jobs_on_startup() - - set_random_day_for_monthy_job() - if app.config.MONITOR_APPS_LIST: app.add_task(monitor_apps_lists(monitor_git=app.config.MONITOR_GIT, monitor_only_good_quality_apps=app.config.MONITOR_ONLY_GOOD_QUALITY_APPS))