Moar fixes from the battlefield

This commit is contained in:
Alexandre Aubin 2023-01-29 18:34:07 +01:00
parent 8f58109973
commit 4992ffe4f6

94
run.py
View file

@ -480,7 +480,7 @@ async def cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=Fal
job.state = "error" job.state = "error"
return False return False
except CancelledError: except (CancelledError, asyncio.exceptions.CancelledError):
command.terminate() command.terminate()
if not ignore_error: if not ignore_error:
@ -564,6 +564,7 @@ async def run_job(worker, job):
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE) stderr=asyncio.subprocess.PIPE)
while not command.stdout.at_eof(): while not command.stdout.at_eof():
data = await command.stdout.readline() data = await command.stdout.readline()
@ -581,7 +582,7 @@ async def run_job(worker, job):
"data": model_to_dict(job), "data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
except CancelledError: except (CancelledError, asyncio.exceptions.CancelledError):
command.terminate() command.terminate()
job.log += "\n" job.log += "\n"
job.state = "canceled" job.state = "canceled"
@ -604,7 +605,7 @@ async def run_job(worker, job):
job.state = "error" job.state = "error"
else: else:
if command.returncode != 0 or not os.path.exists(result_json): if command.returncode != 0 or not os.path.exists(result_json):
job.log += f"\nJob failed ? Return code is {comman.returncode} / Or maybe the json result doesnt exist...\n" job.log += f"\nJob failed ? Return code is {command.returncode} / Or maybe the json result doesnt exist...\n"
job.state = "error" job.state = "error"
else: else:
job.log += f"\nPackage check completed\n" job.log += f"\nPackage check completed\n"
@ -685,7 +686,8 @@ async def run_job(worker, job):
job.log += "Exception:\n" job.log += "Exception:\n"
job.log += traceback.format_exc() job.log += traceback.format_exc()
await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True) #if job.state != "canceled":
# await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True)
# remove ourself from the state # remove ourself from the state
del jobs_in_memory_state[job.id] del jobs_in_memory_state[job.id]
@ -1083,8 +1085,6 @@ async def stop_job(job_id):
"data": model_to_dict(job), "data": model_to_dict(job),
}, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"]) }, ["jobs", f"job-{job.id}", f"app-jobs-{job.url_or_path}"])
await cleanup_old_package_check_if_lock_exists(worker, job, ignore_error=True)
return response.text("ok") return response.text("ok")
if job.state in ("done", "canceled", "failure", "error"): if job.state in ("done", "canceled", "failure", "error"):
@ -1225,25 +1225,25 @@ async def html_index(request):
return {'relative_path_to_root': '', 'path': request.path} return {'relative_path_to_root': '', 'path': request.path}
@always_relaunch(sleep=10) #@always_relaunch(sleep=10)
async def number_of_tasks(): #async def number_of_tasks():
print("Number of tasks: %s" % len(asyncio_all_tasks())) # print("Number of tasks: %s" % len(asyncio_all_tasks()))
#
#
@app.route('/monitor') #@app.route('/monitor')
async def monitor(request): #async def monitor(request):
snapshot = tracemalloc.take_snapshot() # snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno') # top_stats = snapshot.statistics('lineno')
#
tasks = asyncio_all_tasks() # tasks = asyncio_all_tasks()
#
return response.json({ # return response.json({
"top_20_trace": [str(x) for x in top_stats[:20]], # "top_20_trace": [str(x) for x in top_stats[:20]],
"tasks": { # "tasks": {
"number": len(tasks), # "number": len(tasks),
"array": [show_coro(t) for t in tasks], # "array": [show_coro(t) for t in tasks],
} # }
}) # })
@app.route("/github", methods=["GET"]) @app.route("/github", methods=["GET"])
@ -1385,29 +1385,29 @@ async def github(request):
return response.text("ok") return response.text("ok")
def show_coro(c): #def show_coro(c):
data = { # data = {
'txt': str(c), # 'txt': str(c),
'type': str(type(c)), # 'type': str(type(c)),
'done': c.done(), # 'done': c.done(),
'cancelled': False, # 'cancelled': False,
'stack': None, # 'stack': None,
'exception': None, # 'exception': None,
} # }
if not c.done(): # if not c.done():
data['stack'] = [format_frame(x) for x in c.get_stack()] # data['stack'] = [format_frame(x) for x in c.get_stack()]
else: # else:
if c.cancelled(): # if c.cancelled():
data['cancelled'] = True # data['cancelled'] = True
else: # else:
data['exception'] = str(c.exception()) # data['exception'] = str(c.exception())
#
return data # return data
def format_frame(f): #def format_frame(f):
keys = ['f_code', 'f_lineno'] # keys = ['f_code', 'f_lineno']
return dict([(k, str(getattr(f, k))) for k in keys]) # return dict([(k, str(getattr(f, k))) for k in keys])
@app.listener("before_server_start") @app.listener("before_server_start")
@ -1482,7 +1482,7 @@ def main(config="./config.py"):
app.add_task(launch_monthly_job()) app.add_task(launch_monthly_job())
app.add_task(jobs_dispatcher()) app.add_task(jobs_dispatcher())
app.add_task(number_of_tasks()) #app.add_task(number_of_tasks())
app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG) app.run('localhost', port=app.config.PORT, debug=app.config.DEBUG)