mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
Add check LDAP integrity
This commit is contained in:
parent
efffff750e
commit
5a6a85ab07
3 changed files with 106 additions and 3 deletions
|
@ -14,6 +14,7 @@ from yunohost.backup import backup_create, backup_restore, backup_list, backup_i
|
||||||
from yunohost.domain import _get_maindomain
|
from yunohost.domain import _get_maindomain
|
||||||
from yunohost.utils.error import YunohostError
|
from yunohost.utils.error import YunohostError
|
||||||
from yunohost.user import user_permission_list
|
from yunohost.user import user_permission_list
|
||||||
|
from yunohost.tests.test_permission import check_LDAP_db_integrity
|
||||||
|
|
||||||
# Get main domain
|
# Get main domain
|
||||||
maindomain = ""
|
maindomain = ""
|
||||||
|
@ -91,6 +92,12 @@ def teardown_function(function):
|
||||||
shutil.rmtree("/opt/test_backup_output_directory")
|
shutil.rmtree("/opt/test_backup_output_directory")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def check_LDAP_db_integrity_call():
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
yield
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
|
||||||
#
|
#
|
||||||
# Helpers #
|
# Helpers #
|
||||||
#
|
#
|
||||||
|
|
|
@ -48,6 +48,94 @@ def teardown_function(function):
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def check_LDAP_db_integrity_call():
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
yield
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
|
||||||
|
def check_LDAP_db_integrity():
|
||||||
|
# Here we check that all attributes in all object are sychronized.
|
||||||
|
# Here is the list of attributes per object:
|
||||||
|
# user : memberOf, permission
|
||||||
|
# group : member, permission
|
||||||
|
# permission : groupPermission, inheritPermission
|
||||||
|
#
|
||||||
|
# The idea is to check that all attributes on all sides of object are sychronized.
|
||||||
|
# One part should be done automatically by the "memberOf" overlay of LDAP.
|
||||||
|
# The other part is done by the the "permission_sync_to_user" function of the permission module
|
||||||
|
|
||||||
|
user_search = auth.search('ou=users,dc=yunohost,dc=org',
|
||||||
|
'(&(objectclass=person)(!(uid=root))(!(uid=nobody)))',
|
||||||
|
['uid', 'memberOf', 'permission'])
|
||||||
|
group_search = auth.search('ou=groups,dc=yunohost,dc=org',
|
||||||
|
'(objectclass=groupOfNamesYnh)',
|
||||||
|
['cn', 'member', 'memberUid', 'permission'])
|
||||||
|
permission_search = auth.search('ou=permission,dc=yunohost,dc=org',
|
||||||
|
'(objectclass=permissionYnh)',
|
||||||
|
['cn', 'groupPermission', 'inheritPermission', 'memberUid'])
|
||||||
|
|
||||||
|
user_map = {u['uid'][0]: u for u in user_search}
|
||||||
|
group_map = {g['cn'][0]: g for g in group_search}
|
||||||
|
permission_map = {p['cn'][0]: p for p in permission_search}
|
||||||
|
|
||||||
|
for user in user_search:
|
||||||
|
user_dn = 'uid=' + user['uid'][0] + ',ou=users,dc=yunohost,dc=org'
|
||||||
|
group_list = [m.split("=")[1].split(",")[0] for m in user['memberOf']]
|
||||||
|
permission_list = []
|
||||||
|
if 'permission' in user:
|
||||||
|
permission_list = [m.split("=")[1].split(",")[0] for m in user['permission']]
|
||||||
|
|
||||||
|
for group in group_list:
|
||||||
|
assert user_dn in group_map[group]['member']
|
||||||
|
for permission in permission_list:
|
||||||
|
assert user_dn in permission_map[permission]['inheritPermission']
|
||||||
|
|
||||||
|
for permission in permission_search:
|
||||||
|
permission_dn = 'cn=' + permission['cn'][0] + ',ou=permission,dc=yunohost,dc=org'
|
||||||
|
user_list = []
|
||||||
|
group_list = []
|
||||||
|
if 'inheritPermission' in permission:
|
||||||
|
user_list = [m.split("=")[1].split(",")[0] for m in permission['inheritPermission']]
|
||||||
|
assert set(user_list) == set(permission['memberUid'])
|
||||||
|
if 'groupPermission' in permission:
|
||||||
|
group_list = [m.split("=")[1].split(",")[0] for m in permission['groupPermission']]
|
||||||
|
|
||||||
|
for user in user_list:
|
||||||
|
assert permission_dn in user_map[user]['permission']
|
||||||
|
for group in group_list:
|
||||||
|
assert permission_dn in group_map[group]['permission']
|
||||||
|
if 'member' in group_map[group]:
|
||||||
|
user_list_in_group = [m.split("=")[1].split(",")[0] for m in group_map[group]['member']]
|
||||||
|
assert set(user_list_in_group) <= set(user_list)
|
||||||
|
|
||||||
|
for group in group_search:
|
||||||
|
group_dn = 'cn=' + group['cn'][0] + ',ou=groups,dc=yunohost,dc=org'
|
||||||
|
user_list = []
|
||||||
|
permission_list = []
|
||||||
|
if 'member' in group:
|
||||||
|
user_list = [m.split("=")[1].split(",")[0] for m in group['member']]
|
||||||
|
if group['cn'][0] in user_list:
|
||||||
|
# If it's the main group of the user it's normal that it is not in the memberUid
|
||||||
|
g_list = list(user_list)
|
||||||
|
g_list.remove(group['cn'][0])
|
||||||
|
if 'memberUid' in group:
|
||||||
|
assert set(g_list) == set(group['memberUid'])
|
||||||
|
else:
|
||||||
|
assert g_list == []
|
||||||
|
else:
|
||||||
|
assert set(user_list) == set(group['memberUid'])
|
||||||
|
if 'permission' in group:
|
||||||
|
permission_list = [m.split("=")[1].split(",")[0] for m in group['permission']]
|
||||||
|
|
||||||
|
for user in user_list:
|
||||||
|
assert group_dn in user_map[user]['memberOf']
|
||||||
|
for permission in permission_list:
|
||||||
|
assert group_dn in permission_map[permission]['groupPermission']
|
||||||
|
if 'inheritPermission' in permission_map:
|
||||||
|
allowed_user_list = [m.split("=")[1].split(",")[0] for m in permission_map[permission]['inheritPermission']]
|
||||||
|
assert set(user_list) <= set(allowed_user_list)
|
||||||
|
|
||||||
#
|
#
|
||||||
# List functions
|
# List functions
|
||||||
#
|
#
|
||||||
|
@ -177,11 +265,11 @@ def test_disallow_group_1():
|
||||||
|
|
||||||
def test_reset_permission():
|
def test_reset_permission():
|
||||||
# Reset permission
|
# Reset permission
|
||||||
user_permission_remove(auth, ["blog"], "main", group="bob")
|
user_permission_clear(auth, ["blog"], "main")
|
||||||
|
|
||||||
res = user_permission_list(auth)['permissions']
|
res = user_permission_list(auth)['permissions']
|
||||||
assert ["alice"] == res['blog']['main']['allowed_users']
|
assert set(["alice", "bob"]) == set(res['blog']['main']['allowed_users'])
|
||||||
assert ["alice"] == res['blog']['main']['allowed_groups']
|
assert ["all_users"] == res['blog']['main']['allowed_groups']
|
||||||
|
|
||||||
# internal functions
|
# internal functions
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ from moulinette.core import init_authenticator, MoulinetteError
|
||||||
from yunohost.user import user_list, user_info, user_group_list, user_create, user_delete, user_update, user_group_add, user_group_delete, user_group_update, user_group_info
|
from yunohost.user import user_list, user_info, user_group_list, user_create, user_delete, user_update, user_group_add, user_group_delete, user_group_update, user_group_info
|
||||||
from yunohost.domain import _get_maindomain
|
from yunohost.domain import _get_maindomain
|
||||||
from yunohost.utils.error import YunohostError
|
from yunohost.utils.error import YunohostError
|
||||||
|
from yunohost.tests.test_permission import check_LDAP_db_integrity
|
||||||
|
|
||||||
# Get main domain
|
# Get main domain
|
||||||
maindomain = _get_maindomain()
|
maindomain = _get_maindomain()
|
||||||
|
@ -39,6 +40,12 @@ def setup_function(function):
|
||||||
def teardown_function(function):
|
def teardown_function(function):
|
||||||
clean_user_groups()
|
clean_user_groups()
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def check_LDAP_db_integrity_call():
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
yield
|
||||||
|
check_LDAP_db_integrity()
|
||||||
|
|
||||||
#
|
#
|
||||||
# List functions
|
# List functions
|
||||||
#
|
#
|
||||||
|
@ -187,6 +194,7 @@ def test_bad_update_user_1():
|
||||||
with pytest.raises(YunohostError):
|
with pytest.raises(YunohostError):
|
||||||
user_update(auth, "not_exit", firstname="NewName", lastname="NewLast")
|
user_update(auth, "not_exit", firstname="NewName", lastname="NewLast")
|
||||||
|
|
||||||
|
|
||||||
def bad_update_group_1():
|
def bad_update_group_1():
|
||||||
# Check groups not found
|
# Check groups not found
|
||||||
with pytest.raises(YunohostError):
|
with pytest.raises(YunohostError):
|
||||||
|
|
Loading…
Add table
Reference in a new issue