mirror of
https://github.com/YunoHost/yunorunner.git
synced 2024-09-03 20:05:52 +02:00
[enh] add better logging
This commit is contained in:
parent
42d0c85956
commit
6636775501
1 changed files with 39 additions and 17 deletions
56
run.py
56
run.py
|
@ -3,6 +3,7 @@
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import ujson
|
import ujson
|
||||||
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
import random
|
import random
|
||||||
|
@ -17,11 +18,31 @@ from websockets.exceptions import ConnectionClosed
|
||||||
|
|
||||||
from sanic import Sanic, response
|
from sanic import Sanic, response
|
||||||
from sanic.exceptions import NotFound
|
from sanic.exceptions import NotFound
|
||||||
|
from sanic.log import LOGGING_CONFIG_DEFAULTS
|
||||||
|
|
||||||
from playhouse.shortcuts import model_to_dict, dict_to_model
|
from playhouse.shortcuts import model_to_dict, dict_to_model
|
||||||
|
|
||||||
from models import Repo, Job, db, Worker
|
from models import Repo, Job, db, Worker
|
||||||
|
|
||||||
|
LOGGING_CONFIG_DEFAULTS["loggers"]["task"] = {
|
||||||
|
"level": "INFO",
|
||||||
|
"handlers": ["task_console"],
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGING_CONFIG_DEFAULTS["handlers"]["task_console"] = {
|
||||||
|
"class": "logging.StreamHandler",
|
||||||
|
"formatter": "background",
|
||||||
|
"stream": sys.stdout,
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGING_CONFIG_DEFAULTS["formatters"]["background"] = {
|
||||||
|
"format": "%(asctime)s [%(process)d] [BACKGROUND] [%(funcName)s] %(message)s",
|
||||||
|
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
|
||||||
|
"class": "logging.Formatter",
|
||||||
|
}
|
||||||
|
|
||||||
|
task_logger = logging.getLogger("task")
|
||||||
|
|
||||||
app = Sanic()
|
app = Sanic()
|
||||||
|
|
||||||
APPS_LISTS = {
|
APPS_LISTS = {
|
||||||
|
@ -68,23 +89,21 @@ async def monitor_apps_lists():
|
||||||
for app_list_name, url in APPS_LISTS.items():
|
for app_list_name, url in APPS_LISTS.items():
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
app_list = "official"
|
app_list = "official"
|
||||||
sys.stdout.write(f"Downloading {app_list_name}.json...\r")
|
task_logger.info(f"Downloading {app_list_name}.json...")
|
||||||
sys.stdout.flush()
|
|
||||||
async with session.get(url) as resp:
|
async with session.get(url) as resp:
|
||||||
data = await resp.json()
|
data = await resp.json()
|
||||||
sys.stdout.write(f"Downloading {app_list_name}.json...done\n")
|
|
||||||
|
|
||||||
repos = {x.name: x for x in Repo.select().where(Repo.app_list == app_list_name)}
|
repos = {x.name: x for x in Repo.select().where(Repo.app_list == app_list_name)}
|
||||||
|
|
||||||
for app_id, app_data in data.items():
|
for app_id, app_data in data.items():
|
||||||
commit_sha = await get_master_commit_sha(app_id)
|
commit_sha = await get_master_commit_sha(app_id)
|
||||||
print(f"{app_id} → {commit_sha}")
|
|
||||||
|
|
||||||
# already know, look to see if there is new commits
|
# already know, look to see if there is new commits
|
||||||
if app_id in repos:
|
if app_id in repos:
|
||||||
repo = repos[app_id]
|
repo = repos[app_id]
|
||||||
if repo.revision != commit_sha:
|
if repo.revision != commit_sha:
|
||||||
print(f"Application {app_id} has new commits on github, schedule new job")
|
task_logger.info(f"Application {app_id} has new commits on github"
|
||||||
|
f"({repo.revision} → {commit_sha}), schedule new job")
|
||||||
repo.revision = commit_sha
|
repo.revision = commit_sha
|
||||||
repo.save()
|
repo.save()
|
||||||
|
|
||||||
|
@ -101,9 +120,12 @@ async def monitor_apps_lists():
|
||||||
"data": model_to_dict(job),
|
"data": model_to_dict(job),
|
||||||
}, "jobs")
|
}, "jobs")
|
||||||
|
|
||||||
|
else:
|
||||||
|
task_logger.info(f"Application {app_id} in {app_list_name} is still at the same revision {commit_sha}, do nothing")
|
||||||
|
|
||||||
# new app
|
# new app
|
||||||
else:
|
else:
|
||||||
print(f"New application detected: {app_id} in {app_list_name}")
|
task_logger.info(f"New application detected: {app_id} in {app_list_name}, scheduling a new job")
|
||||||
repo = Repo.create(
|
repo = Repo.create(
|
||||||
name=app_id,
|
name=app_id,
|
||||||
url=app_data["git"]["url"],
|
url=app_data["git"]["url"],
|
||||||
|
@ -111,7 +133,6 @@ async def monitor_apps_lists():
|
||||||
app_list=app_list_name,
|
app_list=app_list_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"Schedule a new build for {app_id}")
|
|
||||||
job = Job.create(
|
job = Job.create(
|
||||||
name=f"{app_id} ({app_list_name})",
|
name=f"{app_id} ({app_list_name})",
|
||||||
url_or_path=repo.url,
|
url_or_path=repo.url,
|
||||||
|
@ -176,11 +197,11 @@ async def run_job(worker, job):
|
||||||
}, ["jobs", f"job-{job.id}"])
|
}, ["jobs", f"job-{job.id}"])
|
||||||
|
|
||||||
# fake stupid command, whould run CI instead
|
# fake stupid command, whould run CI instead
|
||||||
print(f"Starting job {job.name}...")
|
task_logger.info(f"Starting job '{job.name}'...")
|
||||||
|
|
||||||
cwd = os.path.split(path_to_analyseCI)[0]
|
cwd = os.path.split(path_to_analyseCI)[0]
|
||||||
arguments = f' {job.url_or_path} "{job.name}"'
|
arguments = f' {job.url_or_path} "{job.name}"'
|
||||||
print("/bin/bash " + path_to_analyseCI + arguments)
|
task_logger.debug(f"Launch command: /bin/bash " + path_to_analyseCI + arguments)
|
||||||
command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments,
|
command = await asyncio.create_subprocess_shell("/bin/bash " + path_to_analyseCI + arguments,
|
||||||
cwd=cwd,
|
cwd=cwd,
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
@ -205,7 +226,7 @@ async def run_job(worker, job):
|
||||||
# XXX stupid crap to stimulate long jobs
|
# XXX stupid crap to stimulate long jobs
|
||||||
await asyncio.sleep(random.randint(1, 15))
|
await asyncio.sleep(random.randint(1, 15))
|
||||||
# await asyncio.sleep(5)
|
# await asyncio.sleep(5)
|
||||||
print(f"Finished job {job.name}")
|
task_logger.info(f"Finished job '{job.name}'")
|
||||||
|
|
||||||
await command.wait()
|
await command.wait()
|
||||||
job.end_time = datetime.now()
|
job.end_time = datetime.now()
|
||||||
|
@ -257,9 +278,8 @@ async def index_ws(request, websocket):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data = await websocket.recv()
|
# do nothing with input but wait
|
||||||
print(f"websocket: {data}")
|
await websocket.recv()
|
||||||
await websocket.send(f"echo {data}")
|
|
||||||
|
|
||||||
|
|
||||||
@app.websocket('/job-<job_id>-ws')
|
@app.websocket('/job-<job_id>-ws')
|
||||||
|
@ -279,9 +299,8 @@ async def job_ws(request, websocket, job_id):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data = await websocket.recv()
|
# do nothing with input but wait
|
||||||
print(f"websocket: {data}")
|
await websocket.recv()
|
||||||
await websocket.send(f"echo {data}")
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/api/job", methods=['POST'])
|
@app.route("/api/job", methods=['POST'])
|
||||||
|
@ -304,7 +323,7 @@ async def api_new_job(request):
|
||||||
debian_version=request.json.get("debian_version", "stretch"),
|
debian_version=request.json.get("debian_version", "stretch"),
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"Request to add new job '{job.name}' {job}")
|
task_logger.info(f"Request to add new job '{job.name}' [{job.id}]")
|
||||||
|
|
||||||
await broadcast({
|
await broadcast({
|
||||||
"action": "new_job",
|
"action": "new_job",
|
||||||
|
@ -326,6 +345,7 @@ async def api_stop_job(request, job_id):
|
||||||
job = job[0]
|
job = job[0]
|
||||||
|
|
||||||
if job.state == "scheduled":
|
if job.state == "scheduled":
|
||||||
|
task_logger.info(f"Cancel scheduled job '{job.name}' [job.id] on request")
|
||||||
job.state = "canceled"
|
job.state = "canceled"
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
|
@ -337,6 +357,7 @@ async def api_stop_job(request, job_id):
|
||||||
return response.text("ok")
|
return response.text("ok")
|
||||||
|
|
||||||
if job.state == "running":
|
if job.state == "running":
|
||||||
|
task_logger.info(f"Cancel running job '{job.name}' [job.id] on request")
|
||||||
job.state = "canceled"
|
job.state = "canceled"
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
|
@ -354,6 +375,7 @@ async def api_stop_job(request, job_id):
|
||||||
return response.text("ok")
|
return response.text("ok")
|
||||||
|
|
||||||
if job.state in ("done", "canceled", "failure"):
|
if job.state in ("done", "canceled", "failure"):
|
||||||
|
task_logger.info(f"Request to cancel job '{job.name}' [job.id] but job is already in '{job.state}' state, do nothing")
|
||||||
# nothing to do, task is already done
|
# nothing to do, task is already done
|
||||||
return response.text("ok")
|
return response.text("ok")
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue