mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
Clean old ldap stuff, naive attempt to fix tests
This commit is contained in:
parent
e2cb8cdfab
commit
9b3bb1362c
8 changed files with 24 additions and 604 deletions
|
@ -1,11 +1,5 @@
|
||||||
language: python
|
language: python
|
||||||
|
|
||||||
addons:
|
|
||||||
apt:
|
|
||||||
packages:
|
|
||||||
- ldap-utils
|
|
||||||
- slapd
|
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
- python: 3.7
|
- python: 3.7
|
||||||
|
|
1
debian/control
vendored
1
debian/control
vendored
|
@ -9,7 +9,6 @@ Homepage: https://github.com/YunoHost/moulinette
|
||||||
Package: moulinette
|
Package: moulinette
|
||||||
Architecture: all
|
Architecture: all
|
||||||
Depends: ${misc:Depends}, ${python3:Depends},
|
Depends: ${misc:Depends}, ${python3:Depends},
|
||||||
python3-ldap,
|
|
||||||
python3-yaml,
|
python3-yaml,
|
||||||
python3-bottle (>= 0.12),
|
python3-bottle (>= 0.12),
|
||||||
python3-gevent-websocket,
|
python3-gevent-websocket,
|
||||||
|
|
|
@ -3,6 +3,9 @@
|
||||||
# Global parameters #
|
# Global parameters #
|
||||||
#############################
|
#############################
|
||||||
_global:
|
_global:
|
||||||
|
authentication:
|
||||||
|
api: dummy
|
||||||
|
cli: null
|
||||||
configuration:
|
configuration:
|
||||||
authenticate:
|
authenticate:
|
||||||
- all
|
- all
|
||||||
|
@ -13,13 +16,6 @@ _global:
|
||||||
yoloswag:
|
yoloswag:
|
||||||
vendor: dummy
|
vendor: dummy
|
||||||
help: Dummy Yoloswag Password
|
help: Dummy Yoloswag Password
|
||||||
ldap:
|
|
||||||
vendor: ldap
|
|
||||||
help: admin_password
|
|
||||||
parameters:
|
|
||||||
uri: ldap://localhost:8080
|
|
||||||
base_dn: dc=yunohost,dc=org
|
|
||||||
user_rdn: cn=admin,dc=yunohost,dc=org
|
|
||||||
arguments:
|
arguments:
|
||||||
-v:
|
-v:
|
||||||
full: --version
|
full: --version
|
||||||
|
@ -43,37 +39,30 @@ testauth:
|
||||||
actions:
|
actions:
|
||||||
none:
|
none:
|
||||||
api: GET /test-auth/none
|
api: GET /test-auth/none
|
||||||
configuration:
|
authentication:
|
||||||
authenticate: false
|
api: null
|
||||||
|
cli: null
|
||||||
|
|
||||||
default:
|
default:
|
||||||
api: GET /test-auth/default
|
api: GET /test-auth/default
|
||||||
|
|
||||||
only-api:
|
only-api:
|
||||||
api: GET /test-auth/only-api
|
api: GET /test-auth/only-api
|
||||||
configuration:
|
authentication:
|
||||||
authenticate:
|
api: dummy
|
||||||
- api
|
cli: null
|
||||||
|
|
||||||
only-cli:
|
only-cli:
|
||||||
api: GET /test-auth/only-cli
|
api: GET /test-auth/only-cli
|
||||||
configuration:
|
authentication:
|
||||||
authenticate:
|
api: null
|
||||||
- cli
|
cli: dummy
|
||||||
|
|
||||||
other-profile:
|
other-profile:
|
||||||
api: GET /test-auth/other-profile
|
api: GET /test-auth/other-profile
|
||||||
configuration:
|
authentication:
|
||||||
authenticate:
|
api: yoloswag
|
||||||
- all
|
cli: yoloswag
|
||||||
authenticator: yoloswag
|
|
||||||
|
|
||||||
ldap:
|
|
||||||
api: GET /test-auth/ldap
|
|
||||||
configuration:
|
|
||||||
authenticate:
|
|
||||||
- all
|
|
||||||
authenticator: ldap
|
|
||||||
|
|
||||||
with_arg:
|
with_arg:
|
||||||
api: GET /test-auth/with_arg/<super_arg>
|
api: GET /test-auth/with_arg/<super_arg>
|
||||||
|
@ -103,21 +92,21 @@ testauth:
|
||||||
actions:
|
actions:
|
||||||
none:
|
none:
|
||||||
api: GET /test-auth/subcat/none
|
api: GET /test-auth/subcat/none
|
||||||
configuration:
|
authentication:
|
||||||
authenticate: false
|
api: null
|
||||||
|
cli: null
|
||||||
|
|
||||||
default:
|
default:
|
||||||
api: GET /test-auth/subcat/default
|
api: GET /test-auth/subcat/default
|
||||||
|
|
||||||
post:
|
post:
|
||||||
api: POST /test-auth/subcat/post
|
api: POST /test-auth/subcat/post
|
||||||
configuration:
|
authentication:
|
||||||
authenticate:
|
api: dummy
|
||||||
- all
|
cli: dummy
|
||||||
authenticator: default
|
|
||||||
|
|
||||||
|
|
||||||
other-profile:
|
other-profile:
|
||||||
api: GET /test-auth/subcat/other-profile
|
api: GET /test-auth/subcat/other-profile
|
||||||
configuration:
|
authentication:
|
||||||
authenticator: yoloswag
|
api: yoloswag
|
||||||
|
cli: yoloswag
|
||||||
|
|
|
@ -7,8 +7,6 @@ import os
|
||||||
import shutil
|
import shutil
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from .src.ldap_server import LDAPServer
|
|
||||||
|
|
||||||
|
|
||||||
def patch_init(moulinette):
|
def patch_init(moulinette):
|
||||||
"""Configure moulinette to use the YunoHost namespace."""
|
"""Configure moulinette to use the YunoHost namespace."""
|
||||||
|
@ -209,11 +207,3 @@ def user():
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def test_url():
|
def test_url():
|
||||||
return "https://some.test.url/yolo.txt"
|
return "https://some.test.url/yolo.txt"
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def ldap_server():
|
|
||||||
server = LDAPServer()
|
|
||||||
server.start()
|
|
||||||
yield server
|
|
||||||
server.stop()
|
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
import slapdtest
|
|
||||||
import os
|
|
||||||
from moulinette.authenticators import ldap as m_ldap
|
|
||||||
|
|
||||||
HERE = os.path.abspath(os.path.dirname(__file__))
|
|
||||||
|
|
||||||
|
|
||||||
class LDAPServer:
|
|
||||||
def __init__(self):
|
|
||||||
self.server_default = slapdtest.SlapdObject()
|
|
||||||
with open(
|
|
||||||
os.path.join(HERE, "..", "ldap_files", "slapd.conf.template"),
|
|
||||||
encoding="utf-8",
|
|
||||||
) as f:
|
|
||||||
SLAPD_CONF_TEMPLATE = f.read()
|
|
||||||
self.server_default.slapd_conf_template = SLAPD_CONF_TEMPLATE
|
|
||||||
self.server_default.suffix = "dc=yunohost,dc=org"
|
|
||||||
self.server_default.root_cn = "admin"
|
|
||||||
self.server_default.SCHEMADIR = os.path.join(HERE, "..", "ldap_files", "schema")
|
|
||||||
self.server_default.openldap_schema_files = [
|
|
||||||
"core.schema",
|
|
||||||
"cosine.schema",
|
|
||||||
"nis.schema",
|
|
||||||
"inetorgperson.schema",
|
|
||||||
"sudo.schema",
|
|
||||||
"yunohost.schema",
|
|
||||||
"mailserver.schema",
|
|
||||||
]
|
|
||||||
self.server = None
|
|
||||||
self.uri = ""
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self.server = self.server_default
|
|
||||||
self.server.start()
|
|
||||||
self.uri = self.server.ldapi_uri
|
|
||||||
with open(
|
|
||||||
os.path.join(HERE, "..", "ldap_files", "tests.ldif"), encoding="utf-8"
|
|
||||||
) as fp:
|
|
||||||
ldif = fp.read()
|
|
||||||
self.server.ldapadd(ldif)
|
|
||||||
self.tools_ldapinit()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self.server:
|
|
||||||
self.server.stop()
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
if self.server:
|
|
||||||
self.server.stop()
|
|
||||||
|
|
||||||
def tools_ldapinit(self):
|
|
||||||
"""
|
|
||||||
YunoHost LDAP initialization
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
with open(os.path.join(HERE, "..", "ldap_files", "ldap_scheme.yml"), "rb") as f:
|
|
||||||
ldap_map = yaml.safe_load(f)
|
|
||||||
|
|
||||||
def _get_ldap_interface():
|
|
||||||
conf = {
|
|
||||||
"vendor": "ldap",
|
|
||||||
"name": "as-root",
|
|
||||||
"parameters": {
|
|
||||||
"uri": self.server.ldapi_uri,
|
|
||||||
"base_dn": "dc=yunohost,dc=org",
|
|
||||||
"user_rdn": "gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid()),
|
|
||||||
},
|
|
||||||
"extra": {},
|
|
||||||
}
|
|
||||||
|
|
||||||
_ldap_interface = m_ldap.Authenticator(**conf)
|
|
||||||
|
|
||||||
return _ldap_interface
|
|
||||||
|
|
||||||
ldap_interface = _get_ldap_interface()
|
|
||||||
|
|
||||||
for rdn, attr_dict in ldap_map["parents"].items():
|
|
||||||
ldap_interface.add(rdn, attr_dict)
|
|
||||||
|
|
||||||
for rdn, attr_dict in ldap_map["children"].items():
|
|
||||||
ldap_interface.add(rdn, attr_dict)
|
|
||||||
|
|
||||||
for rdn, attr_dict in ldap_map["depends_children"].items():
|
|
||||||
ldap_interface.add(rdn, attr_dict)
|
|
||||||
|
|
||||||
admin_dict = {
|
|
||||||
"cn": ["admin"],
|
|
||||||
"uid": ["admin"],
|
|
||||||
"description": ["LDAP Administrator"],
|
|
||||||
"gidNumber": ["1007"],
|
|
||||||
"uidNumber": ["1007"],
|
|
||||||
"homeDirectory": ["/home/admin"],
|
|
||||||
"loginShell": ["/bin/bash"],
|
|
||||||
"objectClass": [
|
|
||||||
"organizationalRole",
|
|
||||||
"posixAccount",
|
|
||||||
"simpleSecurityObject",
|
|
||||||
],
|
|
||||||
"userPassword": [self._hash_user_password("yunohost")],
|
|
||||||
}
|
|
||||||
|
|
||||||
ldap_interface.update("cn=admin", admin_dict)
|
|
||||||
|
|
||||||
def _hash_user_password(self, password):
|
|
||||||
"""
|
|
||||||
Copy pasta of what's in yunohost/user.py
|
|
||||||
"""
|
|
||||||
import string
|
|
||||||
import random
|
|
||||||
import crypt
|
|
||||||
|
|
||||||
char_set = (
|
|
||||||
string.ascii_uppercase + string.ascii_lowercase + string.digits + "./"
|
|
||||||
)
|
|
||||||
salt = "".join([random.SystemRandom().choice(char_set) for x in range(16)])
|
|
||||||
|
|
||||||
salt = "$6$" + salt + "$"
|
|
||||||
return "{CRYPT}" + crypt.crypt(str(password), salt)
|
|
|
@ -34,10 +34,6 @@ def testauth_only_cli():
|
||||||
return "some_data_from_only_cli"
|
return "some_data_from_only_cli"
|
||||||
|
|
||||||
|
|
||||||
def testauth_ldap():
|
|
||||||
return "some_data_from_ldap"
|
|
||||||
|
|
||||||
|
|
||||||
def testauth_with_arg(super_arg):
|
def testauth_with_arg(super_arg):
|
||||||
return super_arg
|
return super_arg
|
||||||
|
|
||||||
|
|
|
@ -158,18 +158,6 @@ class TestAuthAPI:
|
||||||
== "Authentication required"
|
== "Authentication required"
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_login_ldap(self, moulinette_webapi, ldap_server, mocker):
|
|
||||||
mocker.patch(
|
|
||||||
"moulinette.authenticators.ldap.Authenticator._get_uri",
|
|
||||||
return_value=ldap_server.uri,
|
|
||||||
)
|
|
||||||
self.login(moulinette_webapi, profile="ldap", password="yunohost")
|
|
||||||
|
|
||||||
assert (
|
|
||||||
moulinette_webapi.get("/test-auth/ldap", status=200).text
|
|
||||||
== '"some_data_from_ldap"'
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_request_with_arg(self, moulinette_webapi, capsys):
|
def test_request_with_arg(self, moulinette_webapi, capsys):
|
||||||
self.login(moulinette_webapi)
|
self.login(moulinette_webapi)
|
||||||
|
|
||||||
|
|
|
@ -1,414 +0,0 @@
|
||||||
import pytest
|
|
||||||
import os
|
|
||||||
|
|
||||||
from moulinette.authenticators import ldap as m_ldap
|
|
||||||
from moulinette import m18n
|
|
||||||
from moulinette.core import MoulinetteError
|
|
||||||
|
|
||||||
|
|
||||||
class TestLDAP:
|
|
||||||
def setup_method(self):
|
|
||||||
self.ldap_conf = {
|
|
||||||
"vendor": "ldap",
|
|
||||||
"name": "as-root",
|
|
||||||
"parameters": {"base_dn": "dc=yunohost,dc=org"},
|
|
||||||
"extra": {},
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_authenticate_simple_bind_with_admin(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
ldap_interface.authenticate(password="yunohost")
|
|
||||||
|
|
||||||
assert ldap_interface.con
|
|
||||||
|
|
||||||
def test_authenticate_simple_bind_with_wrong_user(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=yoloswag,dc=yunohost,dc=org"
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
ldap_interface.authenticate(password="yunohost")
|
|
||||||
|
|
||||||
translation = m18n.g("invalid_password")
|
|
||||||
expected_msg = translation.format()
|
|
||||||
assert expected_msg in str(exception)
|
|
||||||
assert ldap_interface.con is None
|
|
||||||
|
|
||||||
def test_authenticate_simple_bind_with_rdn_wrong_password(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
ldap_interface.authenticate(password="bad_password_lul")
|
|
||||||
|
|
||||||
translation = m18n.g("invalid_password")
|
|
||||||
expected_msg = translation.format()
|
|
||||||
assert expected_msg in str(exception)
|
|
||||||
|
|
||||||
assert ldap_interface.con is None
|
|
||||||
|
|
||||||
def test_authenticate_simple_bind_anonymous(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = ""
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
ldap_interface.authenticate()
|
|
||||||
|
|
||||||
assert ldap_interface.con
|
|
||||||
|
|
||||||
def test_authenticate_sasl_non_interactive_bind(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"][
|
|
||||||
"user_rdn"
|
|
||||||
] = "gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth" % (
|
|
||||||
os.getgid(),
|
|
||||||
os.getuid(),
|
|
||||||
)
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
|
|
||||||
assert ldap_interface.con
|
|
||||||
|
|
||||||
def test_authenticate_server_down(self, ldap_server, mocker):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = "cn=admin,dc=yunohost,dc=org"
|
|
||||||
ldap_server.stop()
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
|
|
||||||
# Now if slapd is down, moulinette tries to restart it
|
|
||||||
mocker.patch("os.system")
|
|
||||||
mocker.patch("time.sleep")
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
ldap_interface.authenticate(password="yunohost")
|
|
||||||
|
|
||||||
translation = m18n.g("ldap_server_down")
|
|
||||||
expected_msg = translation.format()
|
|
||||||
assert expected_msg in str(exception)
|
|
||||||
|
|
||||||
assert ldap_interface.con is None
|
|
||||||
|
|
||||||
def create_ldap_interface(self, user_rdn, password=None):
|
|
||||||
self.ldap_conf["parameters"]["user_rdn"] = user_rdn
|
|
||||||
ldap_interface = m_ldap.Authenticator(**self.ldap_conf)
|
|
||||||
if not ldap_interface.con:
|
|
||||||
ldap_interface.authenticate(password=password)
|
|
||||||
return ldap_interface
|
|
||||||
|
|
||||||
def test_admin_read(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search("cn=admin,dc=yunohost,dc=org", attrs=None)[0]
|
|
||||||
assert "cn" in admin_info
|
|
||||||
assert admin_info["cn"] == ["admin"]
|
|
||||||
assert "description" in admin_info
|
|
||||||
assert admin_info["description"] == ["LDAP Administrator"]
|
|
||||||
assert "userPassword" in admin_info
|
|
||||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", attrs=["userPassword"]
|
|
||||||
)[0]
|
|
||||||
assert list(admin_info.keys()) == ["userPassword"]
|
|
||||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
|
||||||
|
|
||||||
def test_sasl_read(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid())
|
|
||||||
)
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search("cn=admin,dc=yunohost,dc=org", attrs=None)[0]
|
|
||||||
assert "cn" in admin_info
|
|
||||||
assert admin_info["cn"] == ["admin"]
|
|
||||||
assert "description" in admin_info
|
|
||||||
assert admin_info["description"] == ["LDAP Administrator"]
|
|
||||||
assert "userPassword" in admin_info
|
|
||||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", attrs=["userPassword"]
|
|
||||||
)[0]
|
|
||||||
assert list(admin_info.keys()) == ["userPassword"]
|
|
||||||
assert admin_info["userPassword"][0].startswith("{CRYPT}$6$")
|
|
||||||
|
|
||||||
def test_anonymous_read(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface("")
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search("cn=admin,dc=yunohost,dc=org", attrs=None)[0]
|
|
||||||
assert "cn" in admin_info
|
|
||||||
assert admin_info["cn"] == ["admin"]
|
|
||||||
assert "description" in admin_info
|
|
||||||
assert admin_info["description"] == ["LDAP Administrator"]
|
|
||||||
assert "userPassword" not in admin_info
|
|
||||||
|
|
||||||
admin_info = ldap_interface.search(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", attrs=["userPassword"]
|
|
||||||
)[0]
|
|
||||||
assert not admin_info
|
|
||||||
|
|
||||||
def add_new_user(self, ldap_interface):
|
|
||||||
new_user = "new_user"
|
|
||||||
attr_dict = {
|
|
||||||
"objectClass": ["inetOrgPerson", "posixAccount"],
|
|
||||||
"sn": new_user,
|
|
||||||
"cn": new_user,
|
|
||||||
"userPassword": new_user,
|
|
||||||
"gidNumber": "666",
|
|
||||||
"uidNumber": "666",
|
|
||||||
"homeDirectory": "/home/" + new_user,
|
|
||||||
}
|
|
||||||
ldap_interface.add("uid=%s,ou=users" % new_user, attr_dict)
|
|
||||||
|
|
||||||
# Check if we can login as the new user
|
|
||||||
assert self.create_ldap_interface(
|
|
||||||
"uid=%s,ou=users,dc=yunohost,dc=org" % new_user, new_user
|
|
||||||
).con
|
|
||||||
|
|
||||||
return ldap_interface.search(
|
|
||||||
"uid=%s,ou=users,dc=yunohost,dc=org" % new_user, attrs=None
|
|
||||||
)[0]
|
|
||||||
|
|
||||||
def test_admin_add(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.add_new_user(ldap_interface)
|
|
||||||
assert "cn" in new_user_info
|
|
||||||
assert new_user_info["cn"] == ["new_user"]
|
|
||||||
assert "sn" in new_user_info
|
|
||||||
assert new_user_info["sn"] == ["new_user"]
|
|
||||||
assert "uid" in new_user_info
|
|
||||||
assert new_user_info["uid"] == ["new_user"]
|
|
||||||
assert "objectClass" in new_user_info
|
|
||||||
assert "inetOrgPerson" in new_user_info["objectClass"]
|
|
||||||
assert "posixAccount" in new_user_info["objectClass"]
|
|
||||||
|
|
||||||
def test_sasl_add(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid())
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.add_new_user(ldap_interface)
|
|
||||||
assert "cn" in new_user_info
|
|
||||||
assert new_user_info["cn"] == ["new_user"]
|
|
||||||
assert "sn" in new_user_info
|
|
||||||
assert new_user_info["sn"] == ["new_user"]
|
|
||||||
assert "uid" in new_user_info
|
|
||||||
assert new_user_info["uid"] == ["new_user"]
|
|
||||||
assert "objectClass" in new_user_info
|
|
||||||
assert "inetOrgPerson" in new_user_info["objectClass"]
|
|
||||||
assert "posixAccount" in new_user_info["objectClass"]
|
|
||||||
|
|
||||||
def test_anonymous_add(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface("")
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
self.add_new_user(ldap_interface)
|
|
||||||
|
|
||||||
expected_message = "error during LDAP add operation with: rdn="
|
|
||||||
expected_error = "modifications require authentication"
|
|
||||||
assert expected_error in str(exception)
|
|
||||||
assert expected_message in str(exception)
|
|
||||||
|
|
||||||
def remove_new_user(self, ldap_interface):
|
|
||||||
new_user_info = self.add_new_user(
|
|
||||||
self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid()),
|
|
||||||
"yunohost",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
uid = new_user_info["uid"][0]
|
|
||||||
ldap_interface.remove("uid=%s,ou=users" % uid)
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
ldap_interface.search(
|
|
||||||
"uid=%s,ou=users,dc=yunohost,dc=org" % uid, attrs=None
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_message = "error during LDAP search operation with: base="
|
|
||||||
expected_error = "No such object"
|
|
||||||
assert expected_error in str(exception)
|
|
||||||
assert expected_message in str(exception)
|
|
||||||
|
|
||||||
def test_admin_remove(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.remove_new_user(ldap_interface)
|
|
||||||
|
|
||||||
def test_sasl_remove(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid())
|
|
||||||
)
|
|
||||||
|
|
||||||
self.remove_new_user(ldap_interface)
|
|
||||||
|
|
||||||
def test_anonymous_remove(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface("")
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
self.remove_new_user(ldap_interface)
|
|
||||||
|
|
||||||
expected_message = "error during LDAP delete operation with: rdn="
|
|
||||||
expected_error = "modifications require authentication"
|
|
||||||
assert expected_error in str(exception)
|
|
||||||
assert expected_message in str(exception)
|
|
||||||
|
|
||||||
def update_new_user(self, ldap_interface, new_rdn=False):
|
|
||||||
new_user_info = self.add_new_user(
|
|
||||||
self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid()),
|
|
||||||
"yunohost",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
uid = new_user_info["uid"][0]
|
|
||||||
new_user_info["uidNumber"] = ["555"]
|
|
||||||
new_user_info["gidNumber"] = ["555"]
|
|
||||||
new_another_user_uid = "new_another_user"
|
|
||||||
if new_rdn:
|
|
||||||
new_rdn = "uid=%s" % new_another_user_uid
|
|
||||||
ldap_interface.update("uid=%s,ou=users" % uid, new_user_info, new_rdn)
|
|
||||||
|
|
||||||
if new_rdn:
|
|
||||||
uid = new_another_user_uid
|
|
||||||
return ldap_interface.search(
|
|
||||||
"uid=%s,ou=users,dc=yunohost,dc=org" % uid, attrs=None
|
|
||||||
)[0]
|
|
||||||
|
|
||||||
def test_admin_update(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.update_new_user(ldap_interface)
|
|
||||||
assert new_user_info["uid"] == ["new_user"]
|
|
||||||
assert new_user_info["uidNumber"] == ["555"]
|
|
||||||
assert new_user_info["gidNumber"] == ["555"]
|
|
||||||
|
|
||||||
def test_admin_update_new_rdn(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.update_new_user(ldap_interface, True)
|
|
||||||
assert new_user_info["uid"] == ["new_another_user"]
|
|
||||||
assert new_user_info["uidNumber"] == ["555"]
|
|
||||||
assert new_user_info["gidNumber"] == ["555"]
|
|
||||||
|
|
||||||
def test_sasl_update(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"gidNumber=%s+uidNumber=%s,cn=peercred,cn=external,cn=auth"
|
|
||||||
% (os.getgid(), os.getuid())
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.update_new_user(ldap_interface)
|
|
||||||
assert new_user_info["uid"] == ["new_user"]
|
|
||||||
assert new_user_info["uidNumber"] == ["555"]
|
|
||||||
assert new_user_info["gidNumber"] == ["555"]
|
|
||||||
|
|
||||||
def test_sasl_update_new_rdn(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.update_new_user(ldap_interface, True)
|
|
||||||
assert new_user_info["uid"] == ["new_another_user"]
|
|
||||||
assert new_user_info["uidNumber"] == ["555"]
|
|
||||||
assert new_user_info["gidNumber"] == ["555"]
|
|
||||||
|
|
||||||
def test_anonymous_update(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface("")
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
self.update_new_user(ldap_interface)
|
|
||||||
|
|
||||||
expected_message = "error during LDAP update operation with: rdn="
|
|
||||||
expected_error = "modifications require authentication"
|
|
||||||
assert expected_error in str(exception)
|
|
||||||
assert expected_message in str(exception)
|
|
||||||
|
|
||||||
def test_anonymous_update_new_rdn(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface("")
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
self.update_new_user(ldap_interface, True)
|
|
||||||
|
|
||||||
expected_message = "error during LDAP update operation with: rdn="
|
|
||||||
expected_error = "modifications require authentication"
|
|
||||||
assert expected_error in str(exception)
|
|
||||||
assert expected_message in str(exception)
|
|
||||||
|
|
||||||
def test_empty_update(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_user_info = self.update_new_user(ldap_interface)
|
|
||||||
assert new_user_info["uid"] == ["new_user"]
|
|
||||||
assert new_user_info["uidNumber"] == ["555"]
|
|
||||||
assert new_user_info["gidNumber"] == ["555"]
|
|
||||||
|
|
||||||
uid = new_user_info["uid"][0]
|
|
||||||
|
|
||||||
assert ldap_interface.update("uid=%s,ou=users" % uid, new_user_info)
|
|
||||||
|
|
||||||
def test_get_conflict(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
self.add_new_user(ldap_interface)
|
|
||||||
|
|
||||||
conflict = ldap_interface.get_conflict({"uid": "new_user"})
|
|
||||||
assert conflict == ("uid", "new_user")
|
|
||||||
|
|
||||||
conflict = ldap_interface.get_conflict(
|
|
||||||
{"uid": "new_user"}, base_dn="ou=users,dc=yunohost,dc=org"
|
|
||||||
)
|
|
||||||
assert conflict == ("uid", "new_user")
|
|
||||||
|
|
||||||
conflict = ldap_interface.get_conflict({"uid": "not_a_user"})
|
|
||||||
assert not conflict
|
|
||||||
|
|
||||||
def test_validate_uniqueness(self, ldap_server):
|
|
||||||
self.ldap_conf["parameters"]["uri"] = ldap_server.uri
|
|
||||||
ldap_interface = self.create_ldap_interface(
|
|
||||||
"cn=admin,dc=yunohost,dc=org", "yunohost"
|
|
||||||
)
|
|
||||||
self.add_new_user(ldap_interface)
|
|
||||||
|
|
||||||
with pytest.raises(MoulinetteError) as exception:
|
|
||||||
ldap_interface.validate_uniqueness({"uid": "new_user"})
|
|
||||||
|
|
||||||
translation = m18n.g(
|
|
||||||
"ldap_attribute_already_exists", attribute="uid", value="new_user"
|
|
||||||
)
|
|
||||||
expected_msg = translation.format(attribute="uid", value="new_user")
|
|
||||||
assert expected_msg in str(exception)
|
|
||||||
|
|
||||||
assert ldap_interface.validate_uniqueness({"uid": "not_a_user"})
|
|
Loading…
Add table
Reference in a new issue