Merge pull request #169 from YunoHost/curl-tests

Add ability to define "curl tests" in tests.toml
This commit is contained in:
Alexandre Aubin 2024-08-29 18:07:32 +02:00 committed by GitHub
commit f030296f96
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 309 additions and 143 deletions

View file

@ -203,6 +203,31 @@ test_format = 1.0
test_upgrade_from.00a1a6e7.name = "Upgrade from 5.4"
test_upgrade_from.00a1a6e7.args.foo = "bar"
# -------------------------------
# Curl tests to validate that the app works
# -------------------------------
[default.curl_tests]
#home.path = "/"
home.expect_title = "Login - Nextcloud"
#dash.path = "/"
dash.logged_on_sso = true
dash.expect_title = "Tableau de bord - Nextcloud"
admin.path = "/settings/admin"
admin.logged_on_sso = true
admin.expect_title = "Paramètres d'administration - Nextcloud"
asset.path = "/core/img/logo/logo.svg"
file.path = "/remote.php/dav/files/__USER__/Readme.md"
file.logged_on_sso = true
file.expect_content = "# Welcome to Nextcloud!"
caldav.base_url = "https://yolo.test"
caldav.path = "/.well-known/caldav"
caldav.logged_on_sso = true
caldav.expect_content = "This is the WebDAV interface."
# This is an additional test suite
[multisite]

246
lib/curl_tests.py Normal file
View file

@ -0,0 +1,246 @@
import os
import sys
import toml
import time
import re
import tempfile
import pycurl
from bs4 import BeautifulSoup
from urllib.parse import urlencode
from io import BytesIO
DOMAIN = os.environ["DOMAIN"]
DIST = os.environ["DIST"]
SUBDOMAIN = os.environ["SUBDOMAIN"]
USER = os.environ["USER"]
PASSWORD = os.environ["PASSWORD"]
LXC_IP = os.environ["LXC_IP"]
BASE_URL = os.environ["BASE_URL"].rstrip("/")
APP_DOMAIN = BASE_URL.replace("https://", "").replace("http://", "").split("/")[0]
DEFAULTS = {
"base_url": BASE_URL,
"path": "/",
"logged_on_sso": False,
"expect_title": None,
"expect_content": None,
"expect_title": None,
"expect_effective_url": None,
"auto_test_assets": False,
}
# Example of expected conf:
# ==============================================
# #home.path = "/"
# home.expect_title = "Login - Nextcloud"
#
# #dash.path = "/"
# dash.logged_on_sso = true
# dash.expect_title = "Tableau de bord - Nextcloud"
#
# admin.path = "/settings/admin"
# admin.logged_on_sso = true
# admin.expect_title = "Paramètres d'administration - Nextcloud"
#
# asset.path = "/core/img/logo/logo.svg"
#
# file.path = "/remote.php/dav/files/__USER__/Readme.md"
# file.logged_on_sso = true
# file.expect_content = "# Welcome to Nextcloud!"
#
# caldav.base_url = "https://yolo.test"
# caldav.path = "/.well-known/caldav"
# caldav.logged_on_sso = true
# caldav.expect_content = "This is the WebDAV interface."
# ==============================================
def curl(base_url, path, method="GET", use_cookies=None, save_cookies=None, post=None, referer=None):
domain = base_url.replace("https://", "").replace("http://", "").split("/")[0]
c = pycurl.Curl() # curl
c.setopt(c.URL, f"{base_url}{path}") # https://domain.tld/foo/bar
c.setopt(c.FOLLOWLOCATION, True) # --location
c.setopt(c.SSL_VERIFYPEER, False) # --insecure
c.setopt(c.RESOLVE, [f"{DOMAIN}:80:{LXC_IP}", f"{DOMAIN}:443:{LXC_IP}", f"{SUBDOMAIN}:80:{LXC_IP}", f"{SUBDOMAIN}:443:{LXC_IP}"]) # --resolve
c.setopt(c.HTTPHEADER, [f"Host: {domain}", "X-Requested-With: libcurl"]) # --header
if use_cookies:
c.setopt(c.COOKIEFILE, use_cookies)
if save_cookies:
c.setopt(c.COOKIEJAR, save_cookies)
if post:
c.setopt(c.POSTFIELDS, urlencode(post))
if referer:
c.setopt(c.REFERER, referer)
buffer = BytesIO()
c.setopt(c.WRITEDATA, buffer)
c.perform()
effective_url = c.getinfo(c.EFFECTIVE_URL)
return_code = c.getinfo(c.RESPONSE_CODE)
try:
return_content = buffer.getvalue().decode()
except UnicodeDecodeError:
return_content = "(Binary content?)"
c.close()
return (return_code, return_content, effective_url)
def test(base_url, path, post=None, logged_on_sso=False, expect_return_code=200, expect_content=None, expect_title=None, expect_effective_url=None, auto_test_assets=False):
domain = base_url.replace("https://", "").replace("http://", "").split("/")[0]
if logged_on_sso:
cookies = tempfile.NamedTemporaryFile().name
if DIST == "bullseye":
code, content, _ = curl(f"https://{domain}/yunohost/sso", "/", save_cookies=cookies, post={"user": USER, "password": PASSWORD}, referer=f"https://{domain}/yunohost/sso/")
assert code == 200 and os.system(f"grep -q '{domain}' {cookies}") == 0, f"Failed to log in: got code {code} or cookie file was empty?"
else:
code, content, _ = curl(f"https://{domain}/yunohost/portalapi", "/login", save_cookies=cookies, post={"credentials": f"{USER}:{PASSWORD}"})
assert code == 200 and content == "Logged in", f"Failed to log in: got code {code} and content: {content}"
else:
cookies = None
code = None
retried = 0
while code is None or code in {502, 503, 504}:
time.sleep(retried * 5)
code, content, effective_url = curl(base_url, path, post=post, use_cookies=cookies)
retried += 1
if retried > 3:
break
html = BeautifulSoup(content, features="lxml")
try:
title = html.find("title").string
title = title.strip().replace("\u2013", "-")
except Exception:
title = ""
content = html.find("body").get_text().strip()
content = re.sub(r"[\t\n\s]{3,}", "\n\n", content)
errors = []
if expect_effective_url is None and "/yunohost/sso" in effective_url:
errors.append(f"The request was redirected to yunohost's portal ({effective_url})")
if expect_effective_url and expect_effective_url != effective_url:
errors.append(f"Ended up on URL '{effective_url}', but was expecting '{expect_effective_url}'")
if expect_return_code and code != expect_return_code:
errors.append(f"Got return code {code}, but was expecting {expect_return_code}")
if expect_title is None and "Welcome to nginx" in title:
errors.append("The request ended up on the default nginx page?")
if expect_title and not re.search(expect_title, title):
errors.append(f"Got title '{title}', but was expecting something containing '{expect_title}'")
if expect_content and not re.search(expect_content, content):
errors.append(f"Did not find pattern '{expect_content}' in the page content: '{content[:50]}' (on URL {effective_url})")
assets = []
if auto_test_assets:
assets_to_check = []
stylesheets = html.find_all("link", rel="stylesheet", href=True)
stylesheets = [s for s in stylesheets if "ynh_portal" not in s["href"] and "ynhtheme" not in s["href"]]
if stylesheets:
assets_to_check.append(stylesheets[0]['href'])
js = html.find_all("script", src=True)
js = [s for s in js if "ynh_portal" not in s["src"] and "ynhtheme" not in s["src"]]
if js:
assets_to_check.append(js[0]['src'])
if not assets_to_check:
print("\033[1m\033[93mWARN\033[0m auto_test_assets set to true, but no js/css asset found in this page")
for asset in assets_to_check:
if asset.startswith(f"https://{domain}"):
asset = asset.replace(f"https://{domain}", "")
code, _, effective_url = curl(f"https://{domain}", asset, use_cookies=cookies)
if code != 200:
errors.append(f"Asset https://{domain}{asset} (automatically derived from the page's html) answered with code {code}, expected 200? Effective url: {effective_url}")
assets.append((domain + asset, code))
return {
"url": f"{base_url}{path}",
"effective_url": effective_url,
"code": code,
"title": title,
"content": content,
"assets": assets,
"errors": errors,
}
def run(tests):
results = {}
for name, params in tests.items():
full_params = DEFAULTS.copy()
full_params.update(params)
for key, value in full_params.items():
if isinstance(value, str):
full_params[key] = value.replace("__USER__", USER).replace("__DOMAIN__", APP_DOMAIN)
results[name] = test(**full_params)
display_result(results[name])
if full_params["path"] == "/":
full_params["path"] = ""
results[name + "_noslash"] = test(**full_params)
# Display this result too, but only if there's really a difference compared to the regular test
# because 99% of the time it's the same as the regular test
if results[name + "_noslash"]["effective_url"] != results[name]["effective_url"]:
display_result(results[name + "_noslash"])
return results
def display_result(result):
if result["effective_url"] != result["url"]:
print(f"URL : {result['url']} (redirected to: {result['effective_url']})")
else:
print(f"URL : {result['url']}")
if result['code'] != 200:
print(f"Code : {result['code']}")
if result["title"].strip():
print(f"Title : {result['title'].strip()}")
print(f"Content extract:\n{result['content'][:100].strip()}")
if result["assets"]:
print("Assets :")
for asset, code in result["assets"]:
if code == 200:
print(f" - {asset}")
else:
print(f" - \033[1m\033[91mFAIL\033[0m {asset} (code {code})")
if result["errors"]:
print("Errors :\n -" + "\n -".join(result['errors']))
print("\033[1m\033[91mFAIL\033[0m")
else:
print("\033[1m\033[92mOK\033[0m")
print("========")
def main():
tests = sys.stdin.read()
if not tests.strip():
tests = "home.path = '/'"
tests += "\nhome.auto_test_assets = true"
tests = toml.loads(tests)
results = run(tests)
# If there was at least one error 50x
if any(str(r['code']).startswith("5") for r in results.values()):
sys.exit(5)
elif any(r["errors"] for r in results.values()):
sys.exit(1)
else:
sys.exit(0)
main()

View file

@ -212,19 +212,15 @@ _REMOVE_APP () {
_VALIDATE_THAT_APP_CAN_BE_ACCESSED () {
local check_domain="$1"
local check_path="$2"
local install_type="$3" # Can be anything or 'private', later used to check if it's okay to end up on the portal
local app_id_to_check="${4:-$app_id}"
local curl_error=0
local fell_on_sso_portal=0
local curl_output=$TEST_CONTEXT/curl_output
# Not checking this if this ain't relevant for the current app
this_is_a_web_app || return 0
log_small_title "Validating that the app $app_id_to_check can/can't be accessed with its URL..."
# We don't check the private case anymore because meh
[[ "$3" != "private" ]] || return 0
local domain_to_check="$1"
local path_to_check="$2"
local app_id_to_check="${4:-$app_id}"
# Force the app to public only if we're checking the public-like installs AND visitors are allowed to access the app
# For example, that's the case for agendav which is always installed as
@ -233,148 +229,47 @@ _VALIDATE_THAT_APP_CAN_BE_ACCESSED () {
# accessible *without* tweaking main permission...
local has_public_arg=$(LXC_EXEC "cat /etc/ssowat/conf.json" | jq .permissions.\""$app_id_to_check.main"\".public)
if [ "$install_type" != 'private' ] && [[ $has_public_arg == "false" ]]
if [[ $has_public_arg == "false" ]]
then
log_debug "Forcing public access using tools shell"
# Force the public access by setting force=True, which is not possible with "yunohost user permission update"
_RUN_YUNOHOST_CMD "tools shell -c 'from yunohost.permission import user_permission_update; user_permission_update(\"$app_id_to_check.main\", add=\"visitors\", force=True)'"
fi
# Try to access to the URL in 2 times, with a final / and without
for i in $(seq 1 2)
do
log_small_title "Validating that the app $app_id_to_check can/can't be accessed with its URL..."
# First time we'll try without the trailing slash,
# Second time *with* the trailing slash
local curl_check_path="$(echo $check_path | sed 's@/$@@g')"
[ $i -eq 1 ] || curl_check_path="$curl_check_path/"
# Remove the previous curl output
rm -f "$curl_output"
local http_code="noneyet"
local retry=0
function should_retry() {
[ "${http_code}" = "noneyet" ] || [ "${http_code}" = "502" ] || [ "${http_code}" = "503" ] || [ "${http_code}" = "504" ]
}
while [ $retry -lt 3 ] && should_retry;
do
sleep $(($retry*$retry*$retry + 3))
log_debug "Running curl $check_domain$curl_check_path"
# Call cURL to try to access to the URL of the app
LXC_EXEC "curl --location --insecure --silent --show-error --cookie /dev/null \
--header 'Host: $check_domain' \
--resolve $DOMAIN:80:$LXC_IP \
--resolve $DOMAIN:443:$LXC_IP \
--resolve $SUBDOMAIN:80:$LXC_IP \
--resolve $SUBDOMAIN:443:$LXC_IP \
--write-out '%{http_code};%{url_effective}\n' \
--output './curl_output' \
$check_domain$curl_check_path" \
> "$TEST_CONTEXT/curl_print"
LXC_EXEC "cat ./curl_output" > $curl_output
# Analyze the result of curl command
if [ $? -ne 0 ]
then
log_error "Connection error..."
curl_error=1
fi
http_code=$(cat "$TEST_CONTEXT/curl_print" | cut -d ';' -f1)
log_debug "HTTP code: $http_code"
retry=$((retry+1))
done
# Analyze the http code (we're looking for 0xx 4xx 5xx 6xx codes)
if [ -n "$http_code" ] && echo "0 4 5 6" | grep -q "${http_code:0:1}"
then
# If the http code is a 0xx 4xx or 5xx, it's an error code.
curl_error=1
# 401 is "Unauthorized", so is a answer of the server. So, it works!
[ "${http_code}" == "401" ] && curl_error=0
[ $curl_error -eq 1 ] && log_error "The HTTP code shows an error."
fi
# Analyze the output of cURL
if [ -e "$curl_output" ]
then
# Print the title of the page
local page_title=$(grep "<title>" "$curl_output" | cut --delimiter='>' --fields=2 | cut --delimiter='<' --fields=1)
local page_extract=$(lynx -dump -force_html "$curl_output" | head --lines 20 | tee -a "$full_log")
# Check if the page title is neither the YunoHost portail or default NGINX page
# And check if the "Real URL" is the ynh sso
if [ "$page_title" = "YunoHost Portal" ] || (cat $TEST_CONTEXT/curl_print | cut --delimiter=';' --fields=2 | grep -q "/yunohost/sso")
then
log_debug "The connection attempt fall on the YunoHost portal."
fell_on_sso_portal=1
# Falling on NGINX default page is an error.
elif echo "$page_title" | grep -q "Welcome to nginx"
then
log_error "The connection attempt fall on NGINX default page."
curl_error=1
fi
fi
echo -e "Test URL: $check_domain$curl_check_path
Real URL: $(cat "$TEST_CONTEXT/curl_print" | cut --delimiter=';' --fields=2)
HTTP code: $http_code
Page title: $page_title
Page extract:\n$page_extract" > $TEST_CONTEXT/curl_result
[[ $curl_error -eq 0 ]] \
&& log_debug "$(cat $TEST_CONTEXT/curl_result)" \
|| log_warning "$(cat $TEST_CONTEXT/curl_result)"
# If we had a 50x error, try to display service info and logs to help debugging
if [[ $curl_error -ne 0 ]] && echo "5" | grep -q "${http_code:0:1}"
then
LXC_EXEC "systemctl --no-pager --all" | grep "$app_id_to_check.*service"
for SERVICE in $(LXC_EXEC "systemctl --no-pager -all" | grep -o "$app_id_to_check.*service")
do
LXC_EXEC "journalctl --no-pager --no-hostname -n 30 -u $SERVICE";
done
LXC_EXEC "tail -v -n 15 \$(find /var/log/{nginx/,php*,$app_id_to_check} -mmin -3)"
fi
done
# Detect the issue alias_traversal, https://github.com/yandex/gixy/blob/master/docs/en/plugins/aliastraversal.md
# Create a file to get for alias_traversal
echo "<!DOCTYPE html><html><head>
<title>alias_traversal test</title>
</head><body><h1>alias_traversal test</h1>
If you see this page, you have failed the test for alias_traversal issue.</body></html>" \
> $TEST_CONTEXT/alias_traversal.html
$lxc file push $TEST_CONTEXT/alias_traversal.html $LXC_NAME/var/www/html/alias_traversal.html
curl --location --insecure --silent $check_domain$check_path../html/alias_traversal.html \
| grep "title" | grep --quiet "alias_traversal test" \
&& log_error "Issue alias_traversal detected! Please see here https://github.com/YunoHost/example_ynh/pull/45 to fix that." \
&& SET_RESULT "failure" alias_traversal
[ "$curl_error" -eq 0 ] || return 1
local expected_to_fell_on_portal=""
[ "$install_type" == "private" ] && expected_to_fell_on_portal=1 || expected_to_fell_on_portal=0
if [ "$install_type" == "root" ] || [ "$install_type" == "subdir" ] || [ "$install_type" == "upgrade" ];
if [ -e "$package_path/tests.toml" ]
then
log_info "$(cat $TEST_CONTEXT/curl_result)"
local current_test_serie=$(jq -r '.test_serie' $testfile)
python3 -c "import toml, sys; t = toml.loads(sys.stdin.read()); print(toml.dumps(t['$current_test_serie'].get('curl_tests', {})))" < "$package_path/tests.toml" > $TEST_CONTEXT/curl_tests.toml
# Upgrade from older versions may still be in packaging v1 without a tests.toml
else
echo "" > $TEST_CONTEXT/curl_tests.toml
fi
[ $fell_on_sso_portal -eq $expected_to_fell_on_portal ] || return 1
DIST="$DIST" \
DOMAIN="$DOMAIN" \
SUBDOMAIN="$SUBDOMAIN" \
USER="$TEST_USER" \
PASSWORD="SomeSuperStrongPassword" \
LXC_IP="$LXC_IP" \
BASE_URL="https://$domain_to_check$path_to_check" \
python3 lib/curl_tests.py < $TEST_CONTEXT/curl_tests.toml | tee -a "$full_log"
return 0
curl_result=${PIPESTATUS[0]}
# If we had a 50x error, try to display service info and logs to help debugging
if [[ $curl_result == 5 ]]
then
LXC_EXEC "systemctl --no-pager --all" | grep "$app_id_to_check.*service"
for SERVICE in $(LXC_EXEC "systemctl --no-pager -all" | grep -o "$app_id_to_check.*service")
do
LXC_EXEC "journalctl --no-pager --no-hostname -n 30 -u $SERVICE";
done
LXC_EXEC "tail -v -n 15 \$(find /var/log/{nginx/,php*,$app_id_to_check} -mmin -3)"
fi
return $curl_result
}