diff --git a/run.py b/run.py index 26e4104..52f7c76 100644 --- a/run.py +++ b/run.py @@ -480,7 +480,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal job.state = "error" return False - except CancelledError: + except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() if not ignore_error: @@ -564,6 +564,7 @@ async def run_job(worker, job): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + while not command.stdout.at_eof(): data = await command.stdout.readline() @@ -581,7 +582,7 @@ async def run_job(worker, job): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - except CancelledError: + except (CancelledError, asyncio.exceptions.CancelledError): command.terminate() job.log += "\n" job.state = "canceled" @@ -604,7 +605,7 @@ async def run_job(worker, job): job.state = "error" else: if command.returncode != 0 or not os.path.exists(result_json): - job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n" + job.log += f"\nJob failed ? Return code is {command.returncode} / Or maybe the json result doesnt exist...\n" job.state = "error" else: job.log += f"\nPackage check completed\n" @@ -685,7 +686,8 @@ async def run_job(worker, job): job.log += "Exception:\n" job.log += traceback.format_exc() - await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) + #if job.state != "canceled": + # await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) # remove ourself from the state del jobs_in_memory_state[job.id] @@ -1083,8 +1085,6 @@ async def stop_job(job_id): "data": model_to_dict(job), }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) - await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) - return response.text("ok") if job.state in ("done", "canceled", "failure", "error"): @@ -1225,25 +1225,25 @@ async def html_index(request): return {'relative_path_to_root': '', 'path': request.path} -@always_relaunch(sleep=10) -async def number_of_tasks(): - print("Number of tasks: %s" % len(asyncio_all_tasks())) - - -@app.route('/monitor') -async def monitor(request): - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics('lineno') - - tasks = asyncio_all_tasks() - - return response.json({ - "top_20_trace": [str(x) for x in top_stats[:20]], - "tasks": { - "number": len(tasks), - "array": [show_coro(t) for t in tasks], - } - }) +#@always_relaunch(sleep=10) +#async def number_of_tasks(): +# print("Number of tasks: %s" % len(asyncio_all_tasks())) +# +# +#@app.route('/monitor') +#async def monitor(request): +# snapshot = tracemalloc.take_snapshot() +# top_stats = snapshot.statistics('lineno') +# +# tasks = asyncio_all_tasks() +# +# return response.json({ +# "top_20_trace": [str(x) for x in top_stats[:20]], +# "tasks": { +# "number": len(tasks), +# "array": [show_coro(t) for t in tasks], +# } +# }) @app.route("/github", methods=["GET"]) @@ -1385,29 +1385,29 @@ async def github(request): return response.text("ok") -def show_coro(c): - data = { - 'txt': str(c), - 'type': str(type(c)), - 'done': c.done(), - 'cancelled': False, - 'stack': None, - 'exception': None, - } - if not c.done(): - data['stack'] = [format_frame(x) for x in c.get_stack()] - else: - if c.cancelled(): - data['cancelled'] = True - else: - data['exception'] = str(c.exception()) - - return data +#def show_coro(c): +# data = { +# 'txt': str(c), +# 'type': str(type(c)), +# 'done': c.done(), +# 'cancelled': False, +# 'stack': None, +# 'exception': None, +# } +# if not c.done(): +# data['stack'] = [format_frame(x) for x in c.get_stack()] +# else: +# if c.cancelled(): +# data['cancelled'] = True +# else: +# data['exception'] = str(c.exception()) +# +# return data -def format_frame(f): - keys = ['f_code', 'f_lineno'] - return dict([(k, str(getattr(f, k))) for k in keys]) +#def format_frame(f): +# keys = ['f_code', 'f_lineno'] +# return dict([(k, str(getattr(f, k))) for k in keys]) @app.listener("before_server_start") @@ -1482,7 +1482,7 @@ def main(config="./config.py"): app.add_task(launch_monthly_job()) app.add_task(jobs_dispatcher()) - app.add_task(number_of_tasks()) + #app.add_task(number_of_tasks()) app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG)