yunorunner/ciclic
tituspijean 71d6b4b30e
Make ciclic executable
Best served with `yunohost app shell yunorunner`
2024-05-02 23:47:16 +02:00

131 lines
2.8 KiB
Python
Executable file

#!/usr/bin/env python
import os
import sys
import argh
import requests
from argh.decorators import named
try:
from config import *
except ImportError:
PORT="4242"
DOMAIN = "localhost:" + str(PORT)
def request_api(path, domain, verb, data={}, check_return_code=True):
assert verb in ("get", "post", "put", "delete")
https = False
if domain.split(":")[0] not in ("localhost", "127.0.0.1", "0.0.0.0"):
https = True
response = getattr(requests, verb)(
"http%s://%s/api/%s" % ("s" if https else "", domain, path),
headers={"X-Token": open(".admin_token", "r").read().strip()},
json=data,
)
if response.status_code == 403:
print(f"Error: access refused because '{response.json()['status']}'")
sys.exit(1)
if check_return_code:
# TODO: real error message
assert response.status_code == 200, response.content
return response
def shell():
import ipdb; ipdb.set_trace()
def add(name, url_or_path, domain=DOMAIN):
request_api(
path="job",
verb="post",
domain=domain,
data={
"name": name,
"url_or_path": url_or_path,
},
)
@named("list")
def list_(all=False, domain=DOMAIN):
response = request_api(
path="job",
verb="get",
domain=domain,
data={
"all": all,
},
)
for i in response.json():
print(f"{i['id']:4d} - {i['name']} [{i['state']}]")
def app_list(all=False, domain=DOMAIN):
response = request_api(
path="app",
verb="get",
domain=domain,
)
for i in response.json():
print(f"{i['name']} - {i['url']}")
def delete(job_id, domain=DOMAIN):
response = request_api(
path=f"job/{job_id}",
verb="delete",
domain=domain,
check_return_code=False
)
if response.status_code == 404:
print(f"Error: no job with the id '{job_id}'")
sys.exit(1)
assert response.status_code == 200, response.content
def stop(job_id, domain=DOMAIN):
response = request_api(
path=f"job/{job_id}/stop",
verb="post",
domain=domain,
check_return_code=False
)
if response.status_code == 404:
print(f"Error: no job with the id '{job_id}'")
sys.exit(1)
assert response.status_code == 200, response.content
def restart(job_id, domain=DOMAIN):
response = request_api(
path=f"job/{job_id}/restart",
verb="post",
domain=domain,
check_return_code=False
)
if response.status_code == 404:
print(f"Error: no job with the id '{job_id}'")
sys.exit(1)
assert response.status_code == 200, response.content
if __name__ == '__main__':
argh.dispatch_commands([add, list_, delete, stop, restart, app_list, shell])