mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
refactor, and use fixture for ldap_server
This commit is contained in:
parent
6ee7caebd1
commit
32711aa034
17 changed files with 162 additions and 128 deletions
|
@ -7,6 +7,8 @@ import os
|
|||
import shutil
|
||||
import pytest
|
||||
|
||||
from src.ldap_server import LDAPServer
|
||||
|
||||
|
||||
def patch_init(moulinette):
|
||||
"""Configure moulinette to use the YunoHost namespace."""
|
||||
|
@ -218,3 +220,11 @@ def user():
|
|||
@pytest.fixture
|
||||
def test_url():
|
||||
return "https://some.test.url/yolo.txt"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ldap_server():
|
||||
server = LDAPServer()
|
||||
server.start()
|
||||
yield server
|
||||
server.stop()
|
||||
|
|
104
test/src/ldap_server.py
Normal file
104
test/src/ldap_server.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
try:
|
||||
import slapdtest
|
||||
except ImportError:
|
||||
import old_slapdtest as slapdtest
|
||||
import os
|
||||
from moulinette.authenticators import ldap as m_ldap
|
||||
|
||||
HERE = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
class LDAPServer:
|
||||
def __init__(self):
|
||||
self.server_default = slapdtest.SlapdObject()
|
||||
with open(os.path.join(HERE, "..", "ldap_files", "slapd.conf.template")) as f:
|
||||
SLAPD_CONF_TEMPLATE = f.read()
|
||||
self.server_default.slapd_conf_template = SLAPD_CONF_TEMPLATE
|
||||
self.server_default.suffix = "dc=yunohost,dc=org"
|
||||
self.server_default.root_cn = "admin"
|
||||
self.server_default.SCHEMADIR = os.path.join(HERE, "..", "ldap_files", "schema")
|
||||
self.server_default.openldap_schema_files = [
|
||||
"core.schema",
|
||||
"cosine.schema",
|
||||
"nis.schema",
|
||||
"inetorgperson.schema",
|
||||
"sudo.schema",
|
||||
"yunohost.schema",
|
||||
"mailserver.schema",
|
||||
]
|
||||
self.server = None
|
||||
self.uri = ""
|
||||
|
||||
def start(self):
|
||||
self.server = self.server_default
|
||||
self.server.start()
|
||||
self.uri = self.server.ldapi_uri
|
||||
with open(os.path.join(HERE, "..", "ldap_files", "tests.ldif")) as fp:
|
||||
ldif = fp.read().decode("utf-8")
|
||||
self.server.ldapadd(ldif)
|
||||
self.tools_ldapinit()
|
||||
|
||||
def stop(self):
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
|
||||
def __del__(self):
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
|
||||
def tools_ldapinit(self):
|
||||
"""
|
||||
YunoHost LDAP initialization
|
||||
|
||||
|
||||
"""
|
||||
import yaml
|
||||
|
||||
with open(os.path.join(HERE, "..", "ldap_files", "ldap_scheme.yml")) as f:
|
||||
ldap_map = yaml.load(f)
|
||||
|
||||
def _get_ldap_interface():
|
||||
conf = {
|
||||
"vendor": "ldap",
|
||||
"name": "as-root",
|
||||
"parameters": {
|
||||
"uri": self.server.ldapi_uri,
|
||||
"base_dn": "dc=yunohost,dc=org",
|
||||
"user_rdn": "gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid()),
|
||||
},
|
||||
"extra": {},
|
||||
}
|
||||
|
||||
_ldap_interface = m_ldap.Authenticator(**conf)
|
||||
|
||||
return _ldap_interface
|
||||
|
||||
ldap_interface = _get_ldap_interface()
|
||||
|
||||
for rdn, attr_dict in ldap_map["parents"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
for rdn, attr_dict in ldap_map["children"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
for rdn, attr_dict in ldap_map["depends_children"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
admin_dict = {
|
||||
"cn": "admin",
|
||||
"uid": "admin",
|
||||
"description": "LDAP Administrator",
|
||||
"gidNumber": "1007",
|
||||
"uidNumber": "1007",
|
||||
"homeDirectory": "/home/admin",
|
||||
"loginShell": "/bin/bash",
|
||||
"objectClass": [
|
||||
"organizationalRole",
|
||||
"posixAccount",
|
||||
"simpleSecurityObject",
|
||||
],
|
||||
"userPassword": "yunohost",
|
||||
}
|
||||
|
||||
ldap_interface.update("cn=admin", admin_dict)
|
|
@ -1,131 +1,30 @@
|
|||
import pytest
|
||||
|
||||
try:
|
||||
import slapdtest
|
||||
except ImportError:
|
||||
import old_slapdtest as slapdtest
|
||||
import os
|
||||
|
||||
from moulinette.authenticators import ldap as m_ldap
|
||||
from moulinette import m18n
|
||||
from moulinette.core import MoulinetteError
|
||||
|
||||
HERE = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
class TestLDAP:
|
||||
|
||||
server = None
|
||||
server_default = None
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
cls.server_default = slapdtest.SlapdObject()
|
||||
with open(os.path.join(HERE, "ldap_files", "slapd.conf.template")) as f:
|
||||
SLAPD_CONF_TEMPLATE = f.read()
|
||||
cls.server_default.slapd_conf_template = SLAPD_CONF_TEMPLATE
|
||||
cls.server_default.suffix = "dc=yunohost,dc=org"
|
||||
cls.server_default.root_cn = "admin"
|
||||
cls.server_default.SCHEMADIR = os.path.join(HERE, "ldap_files", "schema")
|
||||
cls.server_default.openldap_schema_files = [
|
||||
"core.schema",
|
||||
"cosine.schema",
|
||||
"nis.schema",
|
||||
"inetorgperson.schema",
|
||||
"sudo.schema",
|
||||
"yunohost.schema",
|
||||
"mailserver.schema",
|
||||
]
|
||||
|
||||
def tools_ldapinit(self):
|
||||
"""
|
||||
YunoHost LDAP initialization
|
||||
|
||||
|
||||
"""
|
||||
import yaml
|
||||
|
||||
with open(os.path.join(HERE, "ldap_files", "ldap_scheme.yml")) as f:
|
||||
ldap_map = yaml.load(f)
|
||||
|
||||
def _get_ldap_interface():
|
||||
conf = {
|
||||
"vendor": "ldap",
|
||||
"name": "as-root",
|
||||
"parameters": {
|
||||
"uri": self.server.ldapi_uri,
|
||||
"base_dn": "dc=yunohost,dc=org",
|
||||
"user_rdn": "gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid()),
|
||||
},
|
||||
"extra": {},
|
||||
}
|
||||
|
||||
_ldap_interface = m_ldap.Authenticator(**conf)
|
||||
|
||||
return _ldap_interface
|
||||
|
||||
ldap_interface = _get_ldap_interface()
|
||||
|
||||
for rdn, attr_dict in ldap_map["parents"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
for rdn, attr_dict in ldap_map["children"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
for rdn, attr_dict in ldap_map["depends_children"].items():
|
||||
ldap_interface.add(rdn, attr_dict)
|
||||
|
||||
admin_dict = {
|
||||
"cn": "admin",
|
||||
"uid": "admin",
|
||||
"description": "LDAP Administrator",
|
||||
"gidNumber": "1007",
|
||||
"uidNumber": "1007",
|
||||
"homeDirectory": "/home/admin",
|
||||
"loginShell": "/bin/bash",
|
||||
"objectClass": [
|
||||
"organizationalRole",
|
||||
"posixAccount",
|
||||
"simpleSecurityObject",
|
||||
],
|
||||
"userPassword": "yunohost",
|
||||
}
|
||||
|
||||
ldap_interface.update("cn=admin", admin_dict)
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
pass
|
||||
|
||||
def setup_method(self):
|
||||
self.server = self.server_default
|
||||
self.server.start()
|
||||
with open(os.path.join(HERE, "ldap_files", "tests.ldif")) as fp:
|
||||
ldif = fp.read().decode("utf-8")
|
||||
self.server.ldapadd(ldif)
|
||||
self.tools_ldapinit()
|
||||
self.ldap_conf = {
|
||||
"vendor": "ldap",
|
||||
"name": "as-root",
|
||||
"parameters": {
|
||||
"uri": self.server.ldapi_uri,
|
||||
"base_dn": "dc=yunohost,dc=org",
|
||||
},
|
||||
"parameters": {"base_dn": "dc=yunohost,dc=org",},
|
||||
"extra": {},
|
||||
}
|
||||
|
||||
def teardown_method(self):
|
||||
self.server.stop()
|
||||
|
||||
def test_authenticate_simple_bind_with_admin(self):
|
||||
def test_authenticate_simple_bind_with_admin(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
||||
ldap_interface.authenticate(password="yunohost")
|
||||
|
||||
assert ldap_interface.con
|
||||
|
||||
def test_authenticate_simple_bind_with_wrong_user(self):
|
||||
def test_authenticate_simple_bind_with_wrong_user(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=yoloswag,dc=yunohost,dc=org"
|
||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -136,7 +35,8 @@ class TestLDAP:
|
|||
assert expected_msg in str(exception)
|
||||
assert ldap_interface.con is None
|
||||
|
||||
def test_authenticate_simple_bind_with_rdn_wrong_password(self):
|
||||
def test_authenticate_simple_bind_with_rdn_wrong_password(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -148,14 +48,16 @@ class TestLDAP:
|
|||
|
||||
assert ldap_interface.con is None
|
||||
|
||||
def test_authenticate_simple_bind_anonymous(self):
|
||||
def test_authenticate_simple_bind_anonymous(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = ""
|
||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
||||
ldap_interface.authenticate()
|
||||
|
||||
assert ldap_interface.con
|
||||
|
||||
def test_authenticate_sasl_non_interactive_bind(self):
|
||||
def test_authenticate_sasl_non_interactive_bind(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = (
|
||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid())
|
||||
|
@ -164,9 +66,10 @@ class TestLDAP:
|
|||
|
||||
assert ldap_interface.con
|
||||
|
||||
def test_authenticate_server_down(self):
|
||||
def test_authenticate_server_down(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
||||
self.server.stop()
|
||||
ldap_server.stop()
|
||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
ldap_interface.authenticate(password="yunohost")
|
||||
|
@ -184,7 +87,8 @@ class TestLDAP:
|
|||
ldap_interface.authenticate(password=password)
|
||||
return ldap_interface
|
||||
|
||||
def test_admin_read(self):
|
||||
def test_admin_read(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -203,7 +107,8 @@ class TestLDAP:
|
|||
assert admin_info.keys() == ["userPassword"]
|
||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
||||
|
||||
def test_sasl_read(self):
|
||||
def test_sasl_read(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid())
|
||||
|
@ -223,7 +128,8 @@ class TestLDAP:
|
|||
assert admin_info.keys() == ["userPassword"]
|
||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
||||
|
||||
def test_anonymous_read(self):
|
||||
def test_anonymous_read(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface("")
|
||||
|
||||
admin_info = ldap_interface.search("cn=admin,dc=yunohost,dc=org", attrs=None)[0]
|
||||
|
@ -260,7 +166,8 @@ class TestLDAP:
|
|||
"uid=%s,ou=users,dc=yunohost,dc=org" % new_user, attrs=None
|
||||
)[0]
|
||||
|
||||
def test_admin_add(self):
|
||||
def test_admin_add(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -276,7 +183,8 @@ class TestLDAP:
|
|||
assert "inetOrgPerson" in new_user_info["objectClass"]
|
||||
assert "posixAccount" in new_user_info["objectClass"]
|
||||
|
||||
def test_sasl_add(self):
|
||||
def test_sasl_add(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid())
|
||||
|
@ -293,7 +201,8 @@ class TestLDAP:
|
|||
assert "inetOrgPerson" in new_user_info["objectClass"]
|
||||
assert "posixAccount" in new_user_info["objectClass"]
|
||||
|
||||
def test_anonymous_add(self):
|
||||
def test_anonymous_add(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface("")
|
||||
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -324,14 +233,16 @@ class TestLDAP:
|
|||
expected_msg = translation.format(action="search")
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
def test_admin_remove(self):
|
||||
def test_admin_remove(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
||||
self.remove_new_user(ldap_interface)
|
||||
|
||||
def test_sasl_remove(self):
|
||||
def test_sasl_remove(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid())
|
||||
|
@ -339,7 +250,8 @@ class TestLDAP:
|
|||
|
||||
self.remove_new_user(ldap_interface)
|
||||
|
||||
def test_anonymous_remove(self):
|
||||
def test_anonymous_remove(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface("")
|
||||
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -372,7 +284,8 @@ class TestLDAP:
|
|||
"uid=%s,ou=users,dc=yunohost,dc=org" % uid, attrs=None
|
||||
)[0]
|
||||
|
||||
def test_admin_update(self):
|
||||
def test_admin_update(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -382,7 +295,8 @@ class TestLDAP:
|
|||
assert new_user_info["uidNumber"] == ["555"]
|
||||
assert new_user_info["gidNumber"] == ["555"]
|
||||
|
||||
def test_admin_update_new_rdn(self):
|
||||
def test_admin_update_new_rdn(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -392,7 +306,8 @@ class TestLDAP:
|
|||
assert new_user_info["uidNumber"] == ["555"]
|
||||
assert new_user_info["gidNumber"] == ["555"]
|
||||
|
||||
def test_sasl_update(self):
|
||||
def test_sasl_update(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
||||
% (os.getgid(), os.getuid())
|
||||
|
@ -403,7 +318,8 @@ class TestLDAP:
|
|||
assert new_user_info["uidNumber"] == ["555"]
|
||||
assert new_user_info["gidNumber"] == ["555"]
|
||||
|
||||
def test_sasl_update_new_rdn(self):
|
||||
def test_sasl_update_new_rdn(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -413,7 +329,8 @@ class TestLDAP:
|
|||
assert new_user_info["uidNumber"] == ["555"]
|
||||
assert new_user_info["gidNumber"] == ["555"]
|
||||
|
||||
def test_anonymous_update(self):
|
||||
def test_anonymous_update(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface("")
|
||||
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -423,7 +340,8 @@ class TestLDAP:
|
|||
expected_msg = translation.format(action="update")
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
def test_anonymous_update_new_rdn(self):
|
||||
def test_anonymous_update_new_rdn(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface("")
|
||||
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
|
@ -433,7 +351,8 @@ class TestLDAP:
|
|||
expected_msg = translation.format(action="update")
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
def test_get_conflict(self):
|
||||
def test_get_conflict(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
@ -450,7 +369,8 @@ class TestLDAP:
|
|||
conflict = ldap_interface.get_conflict({"uid": "not_a_user"})
|
||||
assert not conflict
|
||||
|
||||
def test_validate_uniqueness(self):
|
||||
def test_validate_uniqueness(self, ldap_server):
|
||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
||||
ldap_interface = self.create_ldap_interface(
|
||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue