From 14f0fe0197db63d78a2affcd1d5f88d12092b0b5 Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Sun, 1 Mar 2026 20:19:02 -0500 Subject: [PATCH 1/2] Reworked memcache tests * parametrized test cases * added colliding hash test case * remove poor test scenarios --- src/tests/system/tests/test_memcache.py | 1983 ++++++----------------- 1 file changed, 454 insertions(+), 1529 deletions(-) diff --git a/src/tests/system/tests/test_memcache.py b/src/tests/system/tests/test_memcache.py index 73c4f59ed78..baad46616b7 100644 --- a/src/tests/system/tests/test_memcache.py +++ b/src/tests/system/tests/test_memcache.py @@ -8,1681 +8,606 @@ import pytest from sssd_test_framework.roles.client import Client -from sssd_test_framework.roles.generic import GenericProvider -from sssd_test_framework.roles.ldap import LDAP +from sssd_test_framework.roles.generic import GenericGroup, GenericProvider, GenericUser from sssd_test_framework.topology import KnownTopology, KnownTopologyGroup -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name uses memory cache when SSSD is stopped - :setup: - 1. Add 'user1', 'user2' and 'user3' to SSSD - 2. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that results have correct names - 3. Stop SSSD - 4. Find 'user1', 'user2' and 'user3' with id(name) - 5. Check that results have correct names - :expectedresults: - 1. Users are found - 2. Users have correct names - 3. SSSD is stopped - 4. Users are found - 5. Users have correct names - :customerscenario: False - """ - - def check(users): - for user in users: - result = client.tools.id(user) - assert result is not None, f"User {user} was not found using id" - assert result.user.name == user, f"Username {result.user.name} is incorrect, {user} expected" - - users = ["user1", "user2", "user3"] - for user in users: - provider.user(user).add() - - client.sssd.start() - - check(users) - client.sssd.stop() - check(users) - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_groups(client: Client, provider: GenericProvider): - """ - :title: Lookup group by groupname uses memory cache when SSSD is stopped - :setup: - 1. Add 'group1', 'group2' and 'group3' to SSSD - 2. Start SSSD - :steps: - 1. Find 'group1', 'group2' and 'group3' with getent.group(name) - 2. Check that groups have correct names - 3. Stop SSSD - 4. Find 'group1', 'group2' and 'group3' with getent.group(name) - 5. Check that groups have correct names - :expectedresults: - 1. Groups are found - 2. Groups have correct names - 3. SSSD is stopped - 4. Groups are found - 5. Groups have correct names - :customerscenario: False - """ - - def check(groups): - for group in groups: - result = client.tools.getent.group(group) - assert result is not None, f"Group {group} was not found using getent" - assert result.name == group, f"Groupname {result.name} is incorrect, {group} expected" - - groups = ["group1", "group2", "group3"] - for group in groups: - provider.group(group).add() - - client.sssd.start() - - check(groups) - client.sssd.stop() - check(groups) - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__user_cache_is_disabled_and_lookup_groups(client: Client, provider: GenericProvider): - """ - :title: Lookup group by groupname uses memory cache when SSSD is stopped and 'memcache_size_passwd' = 0 - :setup: - 1. Add 'group1', 'group2' and 'group3' to SSSD - 2. In SSSD nss change 'memcache_size_passwd' to '0' - 3. Start SSSD - :steps: - 1. Find 'group1', 'group2' and 'group3' with getent.group(name) - 2. Check that groups have correct names - 3. Stop SSSD - 4. Find 'group1', 'group2' and 'group3' with getent.group(name) - 5. Check that groups have correct names - :expectedresults: - 1. Groups are found - 2. Groups have correct names - 3. SSSD is stopped - 4. Groups are found - 5. Groups have correct names - :customerscenario: False - """ - - def check(groups): - for group in groups: - result = client.tools.getent.group(group) - assert result is not None, f"Group {group} was not found using getent" - assert result.name == group, f"Groupname {result.name} is incorrect, {group} expected" - - groups = ["group1", "group2", "group3"] - for group in groups: - provider.group(group).add() - - client.sssd.nss["memcache_size_passwd"] = "0" - client.sssd.start() - - check(groups) - client.sssd.stop() - check(groups) - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__user_cache_is_disabled_and_lookup_users(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name when SSSD is stopped and 'memcache_size_passwd' = 0 - uses memory cache therefore user is not found - :setup: - 1. Add users to SSSD - 2. Set users uids and gids - 3. In SSSD nss change 'memcache_size_passwd' to '0' - 4. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users have correct names - 3. Stop SSSD - 4. Find users with id(name) - 5. Find users with id(uid) - :expectedresults: - 1. Users are found - 2. Users have correct names - 3. SSSD is stopped - 4. Users are not found - 5. Users are not found - :customerscenario: False - """ - ids = [("user1", 10001), ("user2", 10002), ("user3", 10003)] - for user, id in ids: - provider.user(user).add(uid=id, gid=id + 500) - - client.sssd.nss["memcache_size_passwd"] = "0" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - for user, id in ids: - result = client.tools.id(user) - assert result is not None, f"User {user} was not found using id" - assert result.user.name == user, f"Username {result.user.name} is incorrect, {user} expected" - assert result.user.id == id, f"User id {result.user.id} is incorrect, {id} expected" - - client.sssd.stop() - - for user, id in ids: - assert client.tools.id(user) is None, f"User {user} was found which is not expected" - assert client.tools.id(id) is None, f"User with id {id} was found which is not expected" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__initgroup_cache_is_disabled_and_lookup_groups(client: Client, provider: GenericProvider): - """ - :title: Lookup group by groupname when SSSD is stopped and 'memcache_size_initgroups' = 0 uses memory cache - :setup: - 1. Add 'group1', 'group2' and 'group3' to SSSD - 2. In SSSD nss change 'memcache_size_initgroups' to '0' - 3. Start SSSD - :steps: - 1. Find 'group1', 'group2' and 'group3' with getent.group(name) - 2. Check that groups have correct names - 3. Stop SSSD - 4. Find 'group1', 'group2' and 'group3' with getent.group(name) - 5. Check that groups have correct names - :expectedresults: - 1. Groups are found - 2. Groups have correct names - 3. SSSD is stopped - 4. Groups are found - 5. Groups have correct names - :customerscenario: False - """ - - def check(groups): - for group in groups: - result = client.tools.getent.group(group) - assert result is not None, f"Group {group} was not found using getent" - assert result.name == group, f"Groupname {result.name} is incorrect, {group} expected" - - groups = ["group1", "group2", "group3"] - for group in groups: - provider.group(group).add() - - client.sssd.nss["memcache_size_initgroups"] = "0" - client.sssd.start() - - check(groups) - client.sssd.stop() - check(groups) - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__initgroup_cache_is_disabled_and_lookup_users(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name and id when SSSD is stopped and 'memcache_size_initgroups' = 0 uses memory cache - :setup: - 1. Add users to SSSD - 2. Set users uids and ids - 3. In SSSD nss change 'memcache_size_initgroups' to '0' - 4. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users have correct names and uids - 3. Stop SSSD - 4. Find 'user1', 'user2' and 'user3' with id(name) - 5. Check that users have correct names and uids - 6. Find 'user1', 'user2' and 'user3' with id(uid) - 7. Check that users have correct names and uids - :expectedresults: - 1. Users are found - 2. Users have correct names and uids - 3. SSSD is stopped - 4. Users are found - 5. Users have correct names and uids - 6. Users are found - 7. Users have correct names and uids - :customerscenario: False - """ - - def check(ids): - for name, id in ids: - result = client.tools.id(name) - assert result is not None, f"User {name} was not found using id" - assert result.user.name == name, f"Username {result.user.name} is incorrect, {user} expected" - assert result.user.id == id, f"User id {result.user.id} is incorrect, {id} expected" - - result = client.tools.id(id) - assert result is not None, f"User with id {id} was not found using id" - assert result.user.name == name, f"Username {result.user.name} is incorrect, {user} expected" - assert result.user.id == id, f"User id {result.user.id} is incorrect, {id} expected" - - ids = [("user1", 10001), ("user2", 10002), ("user3", 10003)] - for user, id in ids: - provider.user(user).add(uid=id, gid=id + 500) - - client.sssd.nss["memcache_size_initgroups"] = "0" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - check(ids) - client.sssd.stop() - check(ids) - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__group_cache_disabled_and_lookup_groups(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name and id when SSSD is stopped and 'memcache_size_group' = 0 uses memory cache, - but lookup groups is not possible - :setup: - 1. Add users to SSSD - 2. Set users uids and gids - 3. Add groups to SSSD - 4. Set groups gids - 5. Add users to groups - 6. In SSSD nss change 'memcache_size_group' to '0' - 7. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users have correct names - 3. Find 'group1' and 'group2' by getent.group(gid) - 4. Check that groups have correct gids and members - 5. Stop SSSD - 6. Find 'user1', 'user2' and 'user3' with id(name) - 7. Check that users have correct names - 8. Find 'group1' and 'group2' by getent.group(name) - 9. Find 'group1' and 'group2' by getent.group(gid) - :expectedresults: - 1. Users are found - 2. Users have correct names - 3. Groups are found - 4. Groups have correct gids and members - 5. SSSD is stopped - 6. Users are found - 7. Users have correct names - 8. Groups are not found - 9. Groups are not found - :customerscenario: False - """ - - def check(users): - for user in users: - rUser = client.tools.id(user) - assert rUser is not None, f"User {rUser} was not found using id" - assert rUser.user.name == user, f"Username {rUser.user.name} is incorrect, {user} expected" - - u1 = provider.user("user1").add(uid=10001, gid=19001) - u2 = provider.user("user2").add(uid=10002, gid=19002) - u3 = provider.user("user3").add(uid=10003, gid=19003) - - provider.group("group1").add(gid=1111).add_member(u1) - provider.group("group2").add(gid=2222).add_members([u1, u2, u3]) - - client.sssd.nss["memcache_size_group"] = "0" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - users = ["user1", "user2", "user3"] - check(users) - - for group, members in [(1111, ["user1"]), (2222, ["user1", "user2", "user3"])]: - result = client.tools.getent.group(group) - assert result is not None, f"Group {group} was not found using getent" - assert result.gid == group, f"Group gid {result.gid} is incorrect, {group} expected" - assert result.members == members, f"Group {group} members did not match the expected ones" - - client.sssd.stop() - - check(users) - - assert client.tools.id("group1") is None, "Group group1 was found which is not expected" - assert client.tools.id("group2") is None, "Group group2 was found which is not expected" - assert client.tools.id(1111) is None, "Group with gid 1111 was found which is not expected" - assert client.tools.id(2222) is None, "Group with gid 2222 was found which is not expected" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__all_caches_disabled_and_all_lookups_fails(client: Client, provider: GenericProvider): - """ - :title: Lookup user and group when SSSD is stopped and whole cache disabled - uses memory cache and therefore it is not possible - :setup: - 1. Add users to SSSD - 2. Set users uids - 3. Add groups to SSSD - 4. Set groups gids - 5. Add users to groups - 6. In SSSD nss change 'memcache_size_passwd' to '0' - 7. In SSSD nss change 'memcache_size_group' to '0' - 8. In SSSD nss change 'memcache_size_initgroups' to '0' - 9. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users have correct names - 3. Find 'group1' and 'group2' by getent.group(name) - 4. Check that groups have correct names and members - 5. Stop SSSD - 6. Find 'user1', 'user2' and 'user3' with id(name) - 7. Find 'user1', 'user2' and 'user3' with id(uid) - 8. Find 'group1' and 'group2' by getent.group(name) - 9. Find 'group1' and 'group2' by getent.group(gid) - :expectedresults: - 1. Users are found - 2. Users have correct names - 3. Groups are found - 4. Groups have correct names and members - 5. SSSD is stopped - 6. Users are not found - 7. Users are not found - 8. Groups are not found - 9. Groups are not found - :customerscenario: False - """ - u1 = provider.user("user1").add(uid=10001, gid=19001) - u2 = provider.user("user2").add(uid=10002, gid=19002) - u3 = provider.user("user3").add(uid=10003, gid=19003) - - provider.group("group1").add(gid=1111).add_member(u1) - provider.group("group2").add(gid=2222).add_members([u1, u2, u3]) - - client.sssd.nss["memcache_size_passwd"] = "0" - client.sssd.nss["memcache_size_group"] = "0" - client.sssd.nss["memcache_size_initgroups"] = "0" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - for user in ["user1", "user2", "user3"]: - result = client.tools.id(user) - assert result is not None, f"User {user} was not found using id" - assert result.user.name == user, f"Username {result.user.name} is incorrect, {user} expected" - - for group, members in [("group1", ["user1"]), ("group2", ["user1", "user2", "user3"])]: - gresult = client.tools.getent.group(group) - assert gresult is not None, f"Group {group} was not found using id" - assert gresult.name == group, f"Groupname {gresult.name} is incorrect, {group} expected" - assert gresult.members == members, f"Group {group} members did not match the expected ones" - - client.sssd.stop() - - for user in ["user1", "user2", "user3"]: - assert client.tools.id(user) is None, f"User {user} was found which is not expected" - - for id in [10001, 10002, 10003]: - assert client.tools.id(id) is None, f"User with id {id} was found which is not expected" - - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(1111) is None, "Group with gid 1111 was found which is not expected" - assert client.tools.getent.group(2222) is None, "Group with gid 2222 was found which is not expected" - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users_check_group_memberships(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name and test membership by name use memory cache when SSSD is stopped - :setup: - 1. Add 'user1', 'user2' and 'user3' to SSSD - 2. Add 'group1' and 'group2' to SSSD - 3. Add users to groups - 4. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users are members of correct groups - 3. Stop SSSD - 4. Find 'user1', 'user2' and 'user3' with id(name) - 5. Check that users are members of correct groups - :expectedresults: - 1. Users are found - 2. Users are members of correct groups - 3. SSSD is stopped - 4. Users are found - 5. Users are members of correct groups - :customerscenario: False - """ - - def check(): - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.memberof(["group1", "group2"]), "User user1 is member of incorrect groups" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.memberof(["group2"]), "User user2 is member of incorrect groups" - - result = client.tools.id("user3") - assert result is not None, "User user3 was not found using id" - assert result.memberof(["group2"]), "User user3 is member of incorrect groups" - - u1 = provider.user("user1").add() - u2 = provider.user("user2").add() - u3 = provider.user("user3").add() - - provider.group("group1").add().add_member(u1) - provider.group("group2").add().add_members([u1, u2, u3]) - - client.sssd.start() - - check() - client.sssd.stop() - check() - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users_and_check_membership_by_gid(client: Client, provider: GenericProvider): - """ - :title: Lookup user by name and test membership by gid use memory cache when SSSD is stopped - :setup: - 1. Add 'user1', 'user2' and 'user3' to SSSD - 2. Add 'group1', 'group2' and 'group3' to SSSD - 3. Set group gids - 4. Add users to groups - 5. Start SSSD - :steps: - 1. Find 'user1', 'user2' and 'user3' with id(name) - 2. Check that users are members of correct groups - 3. Stop SSSD - 4. Find 'user1', 'user2' and 'user3' with id(name) - 5. Check that users are members of correct groups - :expectedresults: - 1. Users are found - 2. Users are members of correct groups - 3. SSSD is stopped - 4. Users are found - 5. Users are members of correct groups - :customerscenario: False - """ - - def check(): - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.memberof([1001, 1002]), "User user1 is member of incorrect groups" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.memberof([1002]), "User user2 is member of incorrect groups" - - result = client.tools.id("user3") - assert result is not None, "User user3 was not found using id" - assert result.memberof([1002]), "User user3 is member of incorrect groups" - - u1 = provider.user("user1").add(uid=11001, gid=19001) - u2 = provider.user("user2").add(uid=11002, gid=19002) - u3 = provider.user("user3").add(uid=11003, gid=19003) - - provider.group("group1").add(gid=1001).add_member(u1) - provider.group("group2").add(gid=1002).add_members([u1, u2, u3]) - provider.group("group3").add(gid=1003) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - check() - client.sssd.stop() - check() - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_uids_and_check_membership_by_gid(client: Client, provider: GenericProvider): - """ - :title: Lookup user by id and test membership by gid use memory cache when SSSD is stopped - :setup: - 1. Add 'user1', 'user2' and 'user3' to SSSD - 2. Set users uids and gids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add users to groups - 6. Start SSSD - :steps: - 1. Find users by id(uid) - 2. Check that users are members of correct groups - 3. Stop SSSD - 4. Find users by id(uid) - 5. Check that users are members of correct groups - :expectedresults: - 1. Users are found - 2. Users are members of correct groups - 3. SSSD is stopped - 4. Users are found - 5. Users are members of correct groups - :customerscenario: False - """ - - def check(): - result = client.tools.id(2001) - assert result is not None, "User with id 2001 was not found using id" - assert result.memberof([101, 1001, 1002]), "User with id 2001 is member of incorrect groups" - - result = client.tools.id(2002) - assert result is not None, "User with id 2002 was not found using id" - assert result.memberof([102, 1002]), "User with id 2002 is member of incorrect groups" - - result = client.tools.id(2003) - assert result is not None, "User with id 2003 was not found using id" - assert result.memberof([103, 1002]), "User with id 2003 is member of incorrect groups" - - u1 = provider.user("user1").add(uid=2001, gid=101) - u2 = provider.user("user2").add(uid=2002, gid=102) - u3 = provider.user("user3").add(uid=2003, gid=103) - - provider.group("group1").add(gid=1001).add_member(u1) - provider.group("group2").add(gid=1002).add_members([u1, u2, u3]) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - check() - client.sssd.stop() - check() - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users_by_fully_qualified_names(client: Client, provider: GenericProvider): - """ - :title: Lookup user by full name when 'use_fully_qualified_names' is 'true' - uses memory cache when sssd is stopped - :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. In SSSD domain change 'use_fully_qualified_names' to 'true' - 3. Start SSSD - :steps: - 1. Find 'user1' and 'user2' with id(name) - 2. Find 'user1' and 'user2' with id(name@domain) - 3. Check that users have correct full names - 4. Stop SSSD - 5. Find 'user1' and 'user2' with id(name) - 6. Find 'user1' and 'user2' with id(name@domain) - 7. Check that users have correct full names - :expectedresults: - 1. Users are not found - 2. Users are found - 3. Users have correct full names - 4. SSSD is stopped - 5. Users are not found - 6. Users are found - 7. Users have correct full names - :customerscenario: False - """ - - def check(): - assert client.tools.id("user1") is None, "User user1 should not be found without fully qualified name" - assert client.tools.id("user2") is None, "User user2 should not be found without fully qualified name" +# The following functions are created to help parametrize the memcache tests. +def add_objects(provider: GenericProvider) -> dict[str, list[GenericUser | GenericGroup]]: + """ + Create objects. - result = client.tools.id("user1@test") - assert result is not None, "User user1@test was not found using id" - assert result.user.name == "user1@test", f"User {result.user.name} has incorrect name, user1@test expected" - - result = client.tools.id("user2@test") - assert result is not None, "User user2@test was not found using id" - assert result.user.name == "user2@test", f"User {result.user.name} has incorrect name, user2@test expected" - - provider.user("user1").add() - provider.user("user2").add() - - client.sssd.domain["use_fully_qualified_names"] = "true" - client.sssd.start() - - check() - client.sssd.stop() - check() + The word objects is used because it may add 'users' or 'groups'. It returns a dict of lists of objects, + so they can be used to retrieve uidNUmber and gidNumbers. - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users_when_case_insensitive_is_false(client: Client, provider: GenericProvider): + :param provider: GenericProvider object. + :type provider: GenericProvider + :return: Dict of objects. + :rtype: dict[str, list[GenericUser | GenericGroup]] """ - :title: Lookup user by case insensitive name when 'case_sensitive' is 'false' - uses memory cache when SSSD is stopped - :setup: - 1. Add 'user1' to SSSD - 2. Set user gid and uid - 3. Add groups to SSSD - 4. Set group gid - 5. Add member to the groups - 6. In SSSD domain change 'case_sensitive' to 'false' - 7. Start SSSD - :steps: - 1. Find users with getent.initgroups(name), where name is in random lower and upper case format - 2. Check that usernames are correct - 3. Check that user is member of correct groups - 4. Stop SSSD - 5. Find user with getent.initgroups(name), where name is last name used when resolving user - 6. Check that username is correct - 7. Check that user is member of correct groups - 8. Find users with getent.initgroups(name), where names are previously used names - :expectedresults: - 1. Users are found - 2. Users have correct names - 3. User is member of correct groups - 4. SSSD is stopped - 5. User is found - 6. User has correct name - 7. User is member of correct groups - 8. Users are not found - :customerscenario: False - """ - u1 = provider.user("user1").add(uid=10001, gid=2001) - provider.group("group1").add(gid=10010).add_member(u1) - provider.group("group2").add(gid=10011).add_member(u1) - provider.group("group3").add(gid=10012).add_member(u1) + user1 = provider.user("user1").add() + user2 = provider.user("user2").add() + user3 = provider.user("user3").add() - client.sssd.domain["case_sensitive"] = "false" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - for name in ["uSer1", "useR1", "USER1", "user1", "uSER1"]: - result = client.tools.getent.initgroups(name) - assert result.name == name, f"Username {result.name} is not correct, {name} expected" - assert result.memberof([10010, 10011, 10012]), f"User {result.name} is member of wrong groups" + group1 = provider.group("group1").add().add_members([user1, user2, user3]) + group2 = provider.group("group2").add().add_members([user2, user3]) + group3 = provider.group("group3").add().add_members([user3]) - client.sssd.stop() + return {"users": [user1, user2, user3], "groups": [group1, group2, group3]} - result = client.tools.getent.initgroups("uSER1") - assert result.name == "uSER1", f"Username {result.name} is not correct, uSER1 expected" - assert result.memberof([10010, 10011, 10012]), f"User {result.name} is member of wrong groups" - # Only last version of name is stored in cache - # That is why initgroups call returns no secondary groups - assert client.tools.getent.initgroups("uSer1").groups == [], "User uSer1 should not be found in cache" - assert client.tools.getent.initgroups("useR1").groups == [], "User useR1 should not be found in cache" - assert client.tools.getent.initgroups("USER1").groups == [], "User USER1 should not be found in cache" - assert client.tools.getent.initgroups("user1").groups == [], "User user1 should not be found in cache" +def assert_objects( + client: Client, objects: dict[str, list[GenericUser | GenericGroup]], cache: str, by_id: bool = False +) -> None: + """ + Check the existence of objects, either users, groups, or initgroups. + + This is a helper function to parameterize the memcache test. The assertions for each + cache type are different. Looking up 'users, will use 'id', groups will use 'getent group', + and initgroups will use 'getent initgroups'. + + If 'id' bool is True, the lookup perform will by uid or gid. + + :param client: Client object. + :type client: Client + :param objects: Dict of object lists. + :type objects: dict[str, list[GenericUser | GenericGroup]] + :param cache: Cache type, 'user', 'group' or 'initgroups' + :type cache: str + :param by_id: Lookup object by id, default is False + :type by_id: bool + """ + if cache == "users": + for user in objects.get("users", []): + if by_id: + _result_user = user.get(["uidNumber"]).get("uidNumber") + assert ( + isinstance(_result_user, list) and len(_result_user) >= 1 + ), "uidNumber is not a list or is empty!" + result_user = client.tools.id(_result_user[-1]) + else: + result_user = client.tools.id(user.name) + assert result_user is not None, f"User '{user.name}' was not found!" + + if cache == "groups": + for group in objects.get("groups", []): + if by_id: + _result_group = group.get(["gidNumber"]).get("gidNumber") + assert ( + isinstance(_result_group, list) and len(_result_group) >= 1 + ), "gidNumber is not a list or is empty!" + result_group = client.tools.getent.group(_result_group[-1]) + else: + result_group = client.tools.getent.group(group.name) + assert result_group is not None, f"Group {group.name} was not found!" + + if cache == "initgroups": + for initgroup in objects.get("users", []): + result_initgroup = client.tools.getent.initgroups(str(initgroup.name)) + assert result_initgroup is not None, f"User '{initgroup.name}' was not found in initgroups!" + + +def assert_objects_not_found(client: Client, objects: dict[str, list[GenericUser | GenericGroup]], cache: str) -> None: + """ + Check for non-existence of objects. + + This helper function is used to parameterize the memcache test. + The assertion for each cache type is different. + + :param client: Client object. + :type client: Client + :param objects: Dict of objects. + :type objects: dict[str, list[GenericUser | GenericGroup]] + :param cache: Cache type, 'user', 'group' or 'initgroups' + :type cache: str + """ + if cache == "users": + for user in objects.get("users", []): + result_user = client.tools.id(user.name) + assert result_user is None, f"User '{user.name}' was found!" + + if cache == "groups": + for group in objects.get("groups", []): + result_group = client.tools.getent.group(group.name) + assert result_group is None, f"Group {group.name} was found!" + + if cache == "initgroups": + for initgroup in objects.get("users", []): + _group = objects.get("groups", [])[-1].name + result_initgroup = client.tools.getent.initgroups(initgroup.name) + assert not result_initgroup.memberof(_group), f"User '{initgroup.name}' was found in initgroups!" + + +def assert_group_membership( + client: Client, objects: dict[str, list[GenericUser | GenericGroup]], cache: str, by_id: bool = False +) -> None: + """ + Checks group membership. + + Helper function to help parameterize the memcache test. The assertion for each cache type is different. + All the users and groups are created during setup. 'user_map' is the expected group membership. + + Each cache type the user membership is checked differently. For 'users' cache, it will use 'id' and check the + memberof attribute. For 'groups' cache, it will use 'getent group' and check the members attribute. For + 'initgroups' cache, it will use 'getent initgroups' and check the memberof attribute with group ids. Importantly, + for 'initgroups' the results are ids; using the user_map, a lookup is performed to get the correct GIDs. + + :param client: Client object. + :type client: Client + :param objects: Dict of objects. + :type objects: dict[str, list[GenericUser | GenericGroup]] + :param cache: Cache type, 'user', 'group' or 'initgroups' + :type cache: str + :param by_id: Lookup object by id, default is False + :type by_id: bool + """ + user_map = {"user1": ["group1"], "user2": ["group1", "group2"], "user3": ["group1", "group2", "group3"]} + + if cache == "users": + for user in objects.get("users", []): + expected_groups = user_map.get(user.name, []) + if by_id: + _result_user = user.get(["uidNumber"]).get("uidNumber") + assert ( + isinstance(_result_user, list) and len(_result_user) >= 1 + ), "uidNumber is not a list or is empty!" + result_user = client.tools.id(str(_result_user[-1])) + else: + result_user = client.tools.id(user.name) + + assert result_user is not None, f"User '{user.name}' was not found!" + assert ( + result_user.groups[-1].name == expected_groups[-1] + ), f"User '{user.name}' is member of incorrect groups!" + + if cache == "groups": + for group in objects.get("groups", []): + expected_members = [user for user, groups in user_map.items() if group.name in groups] + if by_id: + _result_group = group.get(["gidNumber"]).get("gidNumber") + assert ( + isinstance(_result_group, list) and len(_result_group) >= 1 + ), "gidNumber is not a list or is empty!" + result_group = client.tools.getent.group(str(_result_group[-1])) + else: + result_group = client.tools.getent.group(group.name) + + assert result_group is not None, f"Group {group.name} was not found!" + assert result_group.members == expected_members, f"Group '{group.name}' has incorrect members!" + + if cache == "initgroups": + for user in objects.get("users", []): + expected_groups = user_map.get(user.name, []) + expected_ids = [] + + for ids in expected_groups: + for group in objects.get("groups", []): + if group.name == ids: + _result_gid = group.get(["gidNumber"]).get("gidNumber") + assert ( + isinstance(_result_gid, list) and len(_result_gid) > 0 + ), "gidNumber list should not be empty!" + expected_ids.append(int(_result_gid[-1])) + result_initgroup = client.tools.getent.initgroups(user.name) + + assert result_initgroup is not None, f"User '{user.name}' was not found in initgroups!" + assert set(result_initgroup.groups) == set(expected_ids), f"User '{user.name}' groups are wrong!" + + +def invalidate_cache_stop_sssd(client: Client, order: str, cache: str) -> None: + """ + Helpful function to parameterize when the cache is invalidated, + which is either before or after stopping SSSD. + + :param client: Client object. + :type client: Client + :param order: Before or after SSSD stopping. + :type order: str + :param cache: Cache type, 'user', 'group', leave blank for all. + :type cache: str + """ + if order == "before": + if cache == "users": + client.sssctl.cache_expire(users=True) + elif cache == "groups" or cache == "initgroups": + client.sssctl.cache_expire(users=False, groups=True) + else: + client.sssctl.cache_expire(everything=True) + client.sssd.stop() + + if order == "after": + client.sssd.stop() + if cache == "users": + client.sssctl.cache_expire(users=True, groups=False) + elif cache == "groups" or cache == "initgroups": + client.sssctl.cache_expire(users=False, groups=True) + else: + client.sssctl.cache_expire(everything=True) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_users_when_fully_qualified_name_is_true_and_case_ins_is_false( - client: Client, provider: GenericProvider -): - """ - :title: Lookup user by case insensitive fully qualified name when 'case_sensitive' is 'false' - and 'use_fully_qualified_names' is 'true' uses memory cache when SSSD is stopped +@pytest.mark.parametrize( + "cache", + [ + "users", + "groups", + "initgroups", + ], +) +def test_memcache__lookup_objects_by_name(client: Client, provider: GenericProvider, cache: str): + """ + :title: Lookup objects, by name and remains in memcache after SSSD is stopped. + Objects will either be users, groups or initgroups. :setup: - 1. Add user to SSSD - 2. Add groups to SSSD - 3. Set groups gids - 4. Add members to the groups - 5. In SSSD domain change 'use_fully_qualified_names' to 'true' - 6. In SSSD domain change 'case_sensitive' to 'false' - 7. Start SSSD + 1. Create objects to be cached + 2. Start SSSD + 3. Cache objects by looking them up + 4. Stop SSSD :steps: - 1. Find user with names without domain - 2. Find user with getent.initgroups(name@domain), where name is in random lower and upper case format - 3. Check that user is members of correct groups - 4. Stop SSSD - 5. Find user with getent.initgroups(name@domain), where same name as in 2. - 6. Check that user is member of correct groups - 7. Find users with names, that should not be found + 1. Look up objects + 2. Check group membership :expectedresults: - 1. User is not found - 2. User is found - 3. User is member of correct groups - 4. SSSD is stopped - 5. User is found - 6. User is member of correct groups - 7. Users are not found + 1. Objects are found + 2. Group membership is correct :customerscenario: False """ - u1 = provider.user("user1").add(gid=19001, uid=11001) - - provider.group("group1").add(gid=20001).add_member(u1) - provider.group("group2").add(gid=20002).add_member(u1) - provider.group("group3").add(gid=20003).add_member(u1) - - client.sssd.domain["use_fully_qualified_names"] = "true" - client.sssd.domain["case_sensitive"] = "false" - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - assert client.tools.getent.initgroups("uSer1").groups == [], "User uSer1 should be found only with fq name" - assert client.tools.getent.initgroups("user1").groups == [], "User user1 should be found only with fq name" - - result = client.tools.getent.initgroups("uSer1@test") - assert result.memberof([20001, 20002, 20003]), "User uSer1@test is member of incorrect groups" - - client.sssd.stop() - - result = client.tools.getent.initgroups("uSer1@test") - assert result.memberof([20001, 20002, 20003]), "User uSer1@test is member of incorrect groups" - - assert client.tools.getent.initgroups("user1@test").groups == [], "User user1@test should not be found in cache" - assert client.tools.getent.initgroups("user1").groups == [], "User user1 should be found only with fq name" - assert client.tools.getent.initgroups("uSer1").groups == [], "User uSer1 should be found only with fq name" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidation_of_gids_after_initgroups(client: Client, provider: GenericProvider): - """ - :title: Invalidate groups after initgroups call when SSSD is stopped - :setup: - 1. Add 'user1' to SSSD - 2. Set user uid and gid - 3. Add groups to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD - :steps: - 1. Check that "user1" has correct attributes - 2. Check that groups have correct attributes - 3. Check that "user1" has correct initgroups - 4. Stop SSSD - 5. Check that "group2" has correct attributes - 6. Check that "user1" has correct initgroups - 7. All "user1" initgroups should be invalidated and not found - :expectedresults: - 1. User has correct attributes - 2. Groups have correct attributes - 3. User has correct initgroups - 4. SSSD is stopped - 5. Group has correct attributes - 6. User has correct attributes - 7. Groups are not found - :customerscenario: False - """ - - def check_user_passwd(): - for user in ("user1", 10001): - result = client.tools.getent.passwd(user) - assert result is not None, f"User {user} was not found using getent" - assert result.uid == 10001, f"User id {result.uid} is incorrect, expected 10001" - assert result.name == "user1", f"Username {result.name} is incorrect, expected user1" - - def check_initgroups(): - result = client.tools.getent.initgroups("user1") - assert result.name == "user1", f"Username {result.name} is incorrect, user1 expected" - assert result.memberof([12345]), "User user1 is member of incorrect groups" - - def check_group(name, gid): - for group in (name, gid): - gresult = client.tools.getent.group(group) - assert gresult is not None, f"Group {group} was not found using getent" - assert gresult.name == name, f"Groupname {gresult.name} is incorrect, {name} expected" - assert gresult.gid == gid, f"Group gid {gresult.gid} is incorrect, {gid} expected" - - u1 = provider.user("user1").add(uid=10001, gid=19001) - - provider.group("group1").add(gid=12345).add_member(u1) - provider.group("group1_").add(gid=123450).add_member(u1) - provider.group("group2").add(gid=22222) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) client.sssd.start() - - check_user_passwd() - - check_group("group1", 12345) - check_group("group2", 22222) - - check_initgroups() - + assert_objects(client, objects, cache) client.sssd.stop() - check_user_passwd() - check_group("group2", 22222) - - check_initgroups() - - assert client.tools.getent.group(12345) is None, "Group with gid 12345 was found which is not expected" - assert client.tools.getent.group(123450) is None, "Group with gid 123450 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group("group1_") is None, "Group group1_ was found which is not expected" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__lookup_initgroups_without_change_in_membership(client: Client, provider: GenericProvider): - """ - :title: Invalidated cache, after refresh and stopped SSSD, has everything loaded in memory - :setup: - 1. Add 'user1' to SSSD - 2. Set user gid and uid - 3. Add 'group1' to SSSD - 4. Set group gid - 5. Add members to the group - 6. Start SSSD - :steps: - 1. Find user with id(name) and id(uid) - 2. Check that user is member of correct groups - 3. Find group with getent.group(name) and getent.group(gid) - 4. Check that the group have correct name and gid - 5. Invalidate whole cache - 6. Find user with id(name) and id(uid) - 7. Check that user is member of correct groups - 8. Find group with getent.group(name) and getent.group(gid) - 9. Check that the group have correct name and gid - 10. Stop SSSD - 11. Find user with id(name) and id(uid) - 12. Check that user is member of correct groups - 13. Find group with getent.group(name) and getent.group(gid) - 14. Check that the group have correct name and gid - :expectedresults: - 1. User is found - 2. User is member of correct groups - 3. Group is found - 4. Group has correct name and gid - 5. Cache is invalidated - 6. User is found - 7. User is member of correct groups - 8. Group is found - 9. Group has correct name and gid - 10. SSSD is stopped - 11. User is found - 12. User is member of correct groups - 13. Group is found - 14. Group has correct name and gid - :customerscenario: False - """ - - def check(): - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.memberof([111, 12345]), "User user1 is member of incorrect groups" - - result = client.tools.id(10001) - assert result is not None, "User with id 10001 was not found using id" - assert result.memberof([111, 12345]), "User with id 10001 is member of incorrect groups" - - gresult = client.tools.getent.group("group1") - assert gresult is not None, "Group group1 was not found using getent" - assert gresult.gid == 12345, f"Group gid {gresult.gid} is incorrect, 12345 expected" - - gresult = client.tools.getent.group(12345) - assert gresult is not None, "Group with gid 12345 was not found using getent" - assert gresult.name == "group1", f"Groupname {gresult.name} is incorrect, group1 expected" - - gresult = client.tools.getent.group("group2") - assert gresult is not None, "Group group2 was not found using getent" - assert gresult.gid == 222222, f"Group gid {gresult.gid} is incorrect, 222222 expected" - - gresult = client.tools.getent.group(222222) - assert gresult is not None, "Group with gid 222222 was not found using getent" - assert gresult.name == "group2", f"Groupname {gresult.name} is incorrect, group2 expected" - - result = client.tools.getent.initgroups("user1") - assert result.memberof([12345, 123450]), "User user1 is member of incorrect groups" - - u1 = provider.user("user1").add(uid=10001, gid=111) - provider.group("group1").add(gid=12345).add_member(u1) - provider.group("group1_").add(gid=123450).add_member(u1) - provider.group("group2").add(gid=222222) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - check() - client.sssctl.cache_expire(everything=True) - check() - client.sssd.stop() - check() + assert_objects(client, objects, cache) + assert_group_membership(client, objects, cache) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_user_cache_before_stop(client: Client, provider: GenericProvider): - """ - :title: Invalidate user cache before SSSD is stopped +@pytest.mark.parametrize( + "cache", + [ + "users", + "groups", + "initgroups", + ], +) +def test_memcache__lookup_objects_by_id(client: Client, provider: GenericProvider, cache: str): + """ + :title: Lookup objects, by id and remains in memcache after SSSD is stopped. :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users gids and uids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Create objects to be cached + 2. Start SSSD + 3. Cache objects by looking them up + 4. Stop SSSD :steps: - 1. Find user with id(name) - 2. Check that user has correct id - 3. Check that user is member of correct groups - 4. Invalidate cache for 'user1' - 5. Stop SSSD - 6. Find user by id(name) and id(uid) - 7. Find the user's groups by getent.group(name) and getent.group(uid) + 1. Look up objects by uid or gid + 2. Check group membership :expectedresults: - 1. User is found - 2. User has correct id - 3. User is member of correct groups - 4. Cache is invalidated - 5. SSSD is stopped - 6. User is not found - 7. Group is not found + 1. Objects are found + 2. Group membership is correct :customerscenario: False """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) client.sssd.start() - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - assert result.memberof([110011, 101010, 202020]), "User user1 is member of incorrect groups" - - client.sssctl.cache_expire(user="user1") + assert_objects(client, objects, cache, by_id=True) client.sssd.stop() - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" + assert_objects(client, objects, cache, by_id=True) + assert_group_membership(client, objects, cache, by_id=True) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_user_cache_after_stop(client: Client, provider: GenericProvider): +def test_memcache__lookup_users_by_fully_qualified_name(client: Client, provider: GenericProvider): """ - :title: Invalidate user cache after SSSD is stopped + :title: Lookup users by fully qualified names after SSSD is stopped. :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users gids and uids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Create users to be cached + 2. Configure and start SSSD + 3. Cache users by looking them up + 4. Stop SSSD :steps: - 1. Find user with id(name) - 2. Check that user has correct id - 3. Check that user is member of correct groups - 4. Stop SSSD - 5. Invalidate cache for 'user1' - 6. Find user by id(name) and id(uid) - 7. Find the user's groups by getent.group(name) and getent.group(uid) + 1. Look up users by fully qualified names :expectedresults: - 1. User is found - 2. User has correct id - 3. User is member of correct groups - 4. SSSD is stopped - 5. Cache is invalidated - 6. User is not found - 7. Group is not found + 1. Users are found :customerscenario: False """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) + users = objects.get("users", []) + client.sssd.domain["use_fully_qualified_names"] = "True" client.sssd.start() - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - assert result.memberof([110011, 101010, 202020]), "User user1 is member of incorrect groups" - + for user in users: + result_id = client.tools.id(f"{user.name}@{client.sssd.default_domain}") + assert result_id is not None, f"User '{user.name}@{client.sssd.default_domain}' was not found!" client.sssd.stop() - client.sssctl.cache_expire(user="user1") - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" + for user in users: + result_id = client.tools.id(f"{user.name}@{client.sssd.default_domain}") + assert result_id is not None, f"User '{user.name}@{client.sssd.default_domain}' was not found!" @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_users_cache_before_stop(client: Client, provider: GenericProvider): +def test_memcache__lookup_users_by_name_with_case_sensitive_true(client: Client, provider: GenericProvider): """ - :title: Invalidate users cache before SSSD is stopped + :title: Lookup users by name with case sensitivity set to true after SSSD is stopped. :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users gids and uids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Create users to be cached + 2. Configure and start SSSD + 3. Cache users by looking them up + 4. Stop SSSD :steps: - 1. Find users with id(name) - 2. Check that users have correct ids - 3. Check that users are members of correct groups - 4. Invalidate cache for all users - 5. Stop SSSD - 6. Find users by id(name) and id(uid) - 7. Find the groups of users by getent.group(name) and getent.group(uid) + 1. Look up users in uppercase :expectedresults: - 1. Users are found - 2. Users have correct ids - 3. Users are members of correct groups - 4. Cache is invalidated - 5. SSSD is stopped - 6. Users are not found - 7. Groups are not found + 1. Users are found :customerscenario: False """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) + users = objects.get("users", []) + client.sssd.domain["case_sensitive"] = "True" client.sssd.start() - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - assert result.memberof([110011, 101010, 202020]), "User user1 is member of incorrect groups" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.user.id == 220022, f"User id {result.user.id} is incorrect, 220022 expected" - assert result.memberof([222222, 202020]), "User user2 is member of incorrect groups" - - client.sssctl.cache_expire(users=True) + for user in users: + result_id = client.tools.id(user.name) + assert result_id is not None, f"User '{user.name}' was not found!" client.sssd.stop() - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" - assert client.tools.id("user2") is None, "User user2 was found which is not expected" - assert client.tools.id(220022) is None, "User with id 220022 was found which is not expected" - assert client.tools.getent.group(222222) is None, "Group with gid 222222 was found which is not expected" + for user in users: + result_id = client.tools.id(user.name.upper()) + assert result_id is None, f"User '{user.name.upper()}' not found!" @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_users_cache_after_stop(client: Client, provider: GenericProvider): +@pytest.mark.parametrize( + "cache", + ["users", "groups", "initgroups", "all"], +) +def test_memcache__lookup_objects_with_the_same_cache_type_disabled( + client: Client, provider: GenericProvider, cache: str +): """ - :title: Invalidate users cache after SSSD is stopped + :title: Lookup objects, by name with the same cache type disabled. :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users gids and uids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Create objects to be cached + 2. Start SSSD with the memcache disabled for the specific type + 3. Cache objects by looking them up + 4. Stop SSSD :steps: - 1. Find users with id(name) - 2. Check that users have correct ids - 3. Check that users are members of correct groups - 4. Stop SSSD - 5. Invalidate cache for all users - 6. Find users by id(name) and id(uid) - 7. Find the groups of users by getent.group(name) and getent.group(uid) + 1. Look up objects :expectedresults: - 1. Users are found - 2. Users have correct ids - 3. Users are members of correct groups - 4. SSSD is stopped - 5. Cache is invalidated - 6. Users are not found - 7. Groups are not found + 1. Objects are not found :customerscenario: False """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) + if cache == "users": + client.sssd.nss["memcache_size_passwd"] = "0" + elif cache == "groups": + client.sssd.nss["memcache_size_group"] = "0" + elif cache == "initgroups": + client.sssd.nss["memcache_size_initgroups"] = "0" + else: + client.sssd.nss["memcache_size_passwd"] = "0" + client.sssd.nss["memcache_size_group"] = "0" + client.sssd.nss["memcache_size_initgroups"] = "0" client.sssd.start() - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - assert result.memberof([110011, 101010, 202020]), "User user1 is member of incorrect groups" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.user.id == 220022, f"User id {result.user.id} is incorrect, 220022 expected" - assert result.memberof([222222, 202020]), "User user2 is member of incorrect groups" - + assert_objects(client, objects, cache) client.sssd.stop() - client.sssctl.cache_expire(users=True) - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" - assert client.tools.id("user2") is None, "User user2 was found which is not expected" - assert client.tools.id(220022) is None, "User with id 220022 was found which is not expected" - assert client.tools.getent.group(222222) is None, "Group with gid 222222 was found which is not expected" + assert_objects_not_found(client, objects, cache) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_group_cache_before_stop(client: Client, provider: GenericProvider): +@pytest.mark.parametrize( + "cache", + ["users", "groups", "initgroups"], +) +def test_memcache__lookup_objects_with_all_other_cache_types_disabled( + client: Client, provider: GenericProvider, cache: str +): """ - :title: Invalidate group cache before SSSD is stopped + :title: Lookup objects, by name with other cache types disabled. :setup: - 1. Add 'group1' to SSSD - 2. Set group gid - 3. Start SSSD + 1. Create objects to be cached + 2. Start SSSD with the memcache disabled for other object types + 3. Cache objects by looking them up + 4. Stop SSSD :steps: - 1. Find the 'group1' getent.group(name) - 2. Check that group has correct id - 3. Check that group has correct name - 4. Invalidate cache for 'group1' - 5. Stop SSSD - 6. Find the 'group1' getent.group(name) and getent.group(uid) + 1. Look up objects :expectedresults: - 1. Group is found - 2. Group has correct id - 3. Group has correct name - 4. Cache is invalidated - 5. SSSD is stopped - 6. Group is not found + 1. Objects are found :customerscenario: False """ - provider.group("group1").add(gid=101010) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) + if cache == "users": + client.sssd.nss["memcache_size_group"] = "0" + client.sssd.nss["memcache_size_initgroups"] = "0" + elif cache == "groups": + client.sssd.nss["memcache_size_passwd"] = "0" + client.sssd.nss["memcache_size_initgroups"] = "0" + else: + client.sssd.nss["memcache_size_passwd"] = "0" + client.sssd.nss["memcache_size_group"] = "0" client.sssd.start() - - result = client.tools.getent.group("group1") - assert result is not None, "Group group1 was not found using getent" - assert result.name == "group1", f"Groupname {result.name} is incorrect, group1 expected" - assert result.gid == 101010, f"Group gid {result.gid} is incorrect, 101010 expected" - - client.sssctl.cache_expire(group="group1") + assert_objects(client, objects, cache) client.sssd.stop() - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" + assert_objects(client, objects, cache) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_group_cache_after_stop(client: Client, provider: GenericProvider): +def test_memcache__lookup_objects_with_memcache_disabled(client: Client, provider: GenericProvider): """ - :title: Invalidate group cache after SSSD is stopped + :title: Lookup objects, with memcache disabled entirely, memcache_timeout=0 :setup: - 1. Add 'group1' to SSSD - 2. Set group gid - 3. Start SSSD + 1. Create objects to be cached + 2. Start SSSD with the memcache disabled + 3. Cache objects by looking them up + 4. Stop SSSD :steps: - 1. Find the 'group1' getent.group(name) - 2. Check that group has correct id - 3. Check that group has correct name - 4. Stop SSSD - 5. Invalidate cache for 'group1' - 6. Find the 'group1' getent.group(name) and getent.group(uid) + 1. Check memcache cache folder + 2. Look up objects :expectedresults: - 1. Group is found - 2. Group has correct id - 3. Group has correct name - 4. SSSD is stopped - 5. Cache is invalidated - 6. Group is not found + 1. No files are created + 2. Objects are not found :customerscenario: False """ - provider.group("group1").add(gid=101010) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - result = client.tools.getent.group("group1") - assert result is not None, "Group group1 was not found using getent" - assert result.name == "group1", f"Groupname {result.name} is incorrect, group1 expected" - assert result.gid == 101010, f"Group gid {result.gid} is incorrect, 101010 expected" - - client.sssd.stop() - client.sssctl.cache_expire(group="group1") - - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_groups_cache_before_stop(client: Client, provider: GenericProvider): - """ - :title: Invalidate groups cache before SSSD is stopped - :setup: - 1. Add 'group1' and 'group2' to SSSD - 2. Set groups gids - 3. Start SSSD - :steps: - 1. Find groups with getent.group(name) - 2. Check that groups have correct gids - 3. Invalidate cache for all groups - 4. Stop SSSD - 5. Find 'group1' and 'group2' with getent.group(name) and getent.group(gid) - :expectedresults: - 1. Groups are found - 2. Groups have correct gids - 3. Cache is invalidated - 4. SSSD is stopped - 5. Groups are not found - :customerscenario: False - """ - provider.group("group1").add(gid=101010) - provider.group("group2").add(gid=202020) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) + client.sssd.nss["memcache_timeout"] = "0" client.sssd.start() - - result = client.tools.getent.group("group1") - assert result is not None, "Group group1 was not found using getent" - assert result.gid == 101010, f"Group gid {result.gid} is incorrect, 101010 expected" - - result = client.tools.getent.group("group2") - assert result is not None, "Group group2 was not found using getent" - assert result.gid == 202020, f"Group gid {result.gid} is incorrect, 202020 expected" - - client.sssctl.cache_expire(groups=True) + assert_objects(client, objects, "users") + assert_objects(client, objects, "groups") + assert_objects(client, objects, "initgroups") client.sssd.stop() - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" - - -@pytest.mark.importance("high") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_groups_cache_after_stop(client: Client, provider: GenericProvider): - """ - :title: Invalidate groups cache after SSSD is stopped - :setup: - 1. Add 'group1' and 'group2' to SSSD - 2. Set groups gids - 3. Start SSSD - :steps: - 1. Find groups with getent.group(name) - 2. Check that groups have correct gids - 3. Stop SSSD - 4. Invalidate cache for all groups - 5. Find 'group1' and 'group2' with getent.group(name) and getent.group(gid) - :expectedresults: - 1. Groups are found - 2. Groups have correct gids - 3. SSSD is stopped - 4. Cache is invalidated - 5. Groups are not found - :customerscenario: False - """ - provider.group("group1").add(gid=101010) - provider.group("group2").add(gid=202020) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - result = client.tools.getent.group("group1") - assert result is not None, "Group group1 was not found using getent" - assert result.gid == 101010, f"Group gid {result.gid} is incorrect, 101010 expected" - - result = client.tools.getent.group("group2") - assert result is not None, "Group group2 was not found using getent" - assert result.gid == 202020, f"Group gid {result.gid} is incorrect, 202020 expected" - - client.sssd.stop() - client.sssctl.cache_expire(groups=True) + file_list = client.host.conn.run("ls /var/lib/sss/mc/").stdout_lines + assert len(file_list) == 0, "Cache files should not be present when memcache is disabled!" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" + assert_objects_not_found(client, objects, "users") + assert_objects_not_found(client, objects, "groups") + assert_objects_not_found(client, objects, "initgroups") @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_everything_before_stop(client: Client, provider: GenericProvider): +@pytest.mark.parametrize( + "cache", + ["users", "groups", "initgroups", "all"], +) +@pytest.mark.parametrize( + "order", + ["before", "after"], + ids=["before stopping sssd", "after stopping sssd"], +) +def test_memcache__invalidating_caches_before_or_after_stopping_sssd( + client: Client, provider: GenericProvider, cache: str, order: str +): """ - :title: Invalidate all parts of cache before SSSD is stopped + :title: Lookup objects and changing when the cache is invalidated, before or after stopping sssd :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users uids and gids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Create objects to be cached + 2. Start SSSD + 3. Cache objects by looking them up + 4. Invalidate the cache objects :steps: - 1. Find users with id(name) - 2. Check that users have correct uids - 3. Find groups with getent.group(name) - 4. Check that groups have correct gids - 5. Invalidate all parts of cache - 6. Stop SSSD - 7. Find 'user1' and 'user2' with id(name) and id(uid) - 8. Find 'group1' and 'group2' with getent.group(name) and getent.group(gid) + 1. Look up objects :expectedresults: - 1. Users are found - 2. Users have correct uids - 3. Groups are found - 4. Groups have correct gids - 5. Cache is invalidated - 6. SSSD is stopped - 7. Users are not found - 8. Groups are not found + 1. Objects are not found :customerscenario: False """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) client.sssd.start() + assert_objects(client, objects, cache) + invalidate_cache_stop_sssd(client, order, cache) - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.user.id == 220022, f"User id {result.user.id} is incorrect, 220022 expected" - - gresult = client.tools.getent.group("group1") - assert gresult is not None, "Group group1 was not found using getent" - assert gresult.gid == 101010, f"Group gid {gresult.gid} is incorrect, 101010 expected" - - gresult = client.tools.getent.group("group2") - assert gresult is not None, "Group group2 was not found using getent" - assert gresult.gid == 202020, f"Group gid {gresult.gid} is incorrect, 202020 expected" - - client.sssctl.cache_expire(everything=True) - client.sssd.stop() - - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.id("user2") is None, "User user2 was found which is not expected" - assert client.tools.id(220022) is None, "User with id 220022 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" + assert_objects_not_found(client, objects, cache) @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__invalidate_everything_after_stop(client: Client, provider: GenericProvider): +@pytest.mark.ticket(bz=2226021) +def test_memcache__truncate_in_memory_cache_no_sigbus(client: Client, provider: GenericProvider): """ - :title: Invalidate all parts of cache after SSSD is stopped + :title: Accessing truncated in-memory cache file does not cause SIGBUS :setup: - 1. Add 'user1' and 'user2' to SSSD - 2. Set users uids and gids - 3. Add 'group1' and 'group2' to SSSD - 4. Set groups gids - 5. Add members to the groups - 6. Start SSSD + 1. Add objects to be cached + 2. Start SSSD + 3. Cache users by looking them up + 4. Truncate /var/lib/sss/mc/passwd :steps: - 1. Find users with id(name) - 2. Check that users have correct uids - 3. Find groups with getent.group(name) - 4. Check that groups have correct gids - 5. Stop SSSD - 6. Invalidate all parts of cache - 7. Find 'user1' and 'user2' with id(name) and id(uid) - 8. Find 'group1' and 'group2' with getent.group(name) and getent.group(gid) + 1. Lookup users :expectedresults: 1. Users are found - 2. Users have correct uids - 3. Groups are found - 4. Groups have correct gids - 5. SSSD is stopped - 6. Cache is invalidated - 7. Users are not found - 8. Groups are not found - :customerscenario: False - """ - u1 = provider.user("user1").add(uid=123456, gid=110011) - u2 = provider.user("user2").add(uid=220022, gid=222222) - - provider.group("group1").add(gid=101010).add_member(u1) - provider.group("group2").add(gid=202020).add_members([u1, u2]) - - client.sssd.domain["ldap_id_mapping"] = "false" - client.sssd.start() - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - - result = client.tools.id("user2") - assert result is not None, "User user2 was not found using id" - assert result.user.id == 220022, f"User id {result.user.id} is incorrect, 220022 expected" - - gresult = client.tools.getent.group("group1") - assert gresult is not None, "Group group1 was not found using getent" - assert gresult.gid == 101010, f"Group gid {gresult.gid} is incorrect, 101010 expected" - - gresult = client.tools.getent.group("group2") - assert gresult is not None, "Group group2 was not found using getent" - assert gresult.gid == 202020, f"Group gid {gresult.gid} is incorrect, 202020 expected" - - client.sssd.stop() - client.sssctl.cache_expire(everything=True) - - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.id("user2") is None, "User user2 was found which is not expected" - assert client.tools.id(220022) is None, "User with id 220022 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(110011) is None, "Group with gid 110011 was found which is not expected" - assert client.tools.getent.group("group2") is None, "Group group2 was found which is not expected" - assert client.tools.getent.group(202020) is None, "Group with gid 202020 was found which is not expected" - - -@pytest.mark.importance("critical") -@pytest.mark.cache -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) -@pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__memcache_timeout_zero(client: Client, provider: GenericProvider): - """ - :title: Cache is not created at all when 'memcache_timeout' set to '0' - :setup: - 1. Add 'user1' to SSSD - 2. Set user uid - 3. Add 'group1' to SSSD - 4. Set group gid - 5. In SSSD nss change 'memcache_timeout' set to '0' - 6. Start SSSD - :steps: - 1. Check that cache is not created - 2. Find user with id(name) - 3. Check that user has correct uid - 4. Find group with getent.group(name) - 5. Check that group has correct gid - 6. Stop SSSD - 7. Find user with id(name) and id(uid) - 8. Find group with getent.group(name) and getent.group(gid) - :expectedresults: - 1. Cache is not created - 2. User is found - 3. User has correct uid - 4. Group is found - 5. Group has correct gid - 6. Stop SSSD - 7. User is not found - 8. Group is not found - :customerscenario: False + :customerscenario: True """ - provider.user("user1").add(uid=123456, gid=19001) - provider.group("group1").add(gid=10001) - - client.sssd.nss["memcache_timeout"] = "0" - client.sssd.domain["ldap_id_mapping"] = "false" + objects = add_objects(provider) client.sssd.start() + assert_objects(client, objects, "users") + client.fs.truncate("/var/lib/sss/mc/passwd") - r = client.host.conn.exec(["ls", "/var/lib/sss/mc"]) - assert r.stdout == "", "Cache directory is not empty" - assert r.stderr == "", "Ls command failed" - - result = client.tools.id("user1") - assert result is not None, "User user1 was not found using user1" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - - gresult = client.tools.getent.group("group1") - assert gresult is not None, "Group group1 is not found using getent" - assert gresult.gid == 10001, f"Group gid {gresult.gid} is incorrect, 10001 expected" - - client.sssd.stop() - - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(10001) is None, "Group with gid 10001 was found which is not expected" + assert_objects(client, objects, "users") @pytest.mark.importance("critical") -@pytest.mark.cache @pytest.mark.topology(KnownTopologyGroup.AnyProvider) @pytest.mark.preferred_topology(KnownTopology.LDAP) -def test_memcache__removed_cache_without_invalidation(client: Client, provider: GenericProvider): +@pytest.mark.ticket(gh=4595) +def test_memcache__handles_colliding_hashes(client: Client, provider: GenericProvider): """ - :title: SSSD is stopped, cache removed then users and groups cannot be lookedup + :title: Lookup objects with colliding hashes are handled correctly + The string for the colliding hash needs to be longer than the other + object data stored in mcache. Like 'getent passwd user1 | wc -c' => 45 + Note, pysss_murmur is used to generate the hash. :setup: - 1. Add 'user1' to SSSD - 2. Set user uid and gid - 3. Add 'group1' to SSSD - 4. Set group gid - 5. Start SSSD + 1. Upload script to generate colliding hash + 2. Create the first user, cache the user by looking up the user + 3. Run the script to get the hash + 4. Create the second user using the hash as the username + 5. Cache users by looking them up and after stop sssd :steps: - 1. Find user with id(name) - 2. Check that user has correct uid - 3. Find group with getent.group(name) - 4. Check that group has correct gid - 5. Stop SSSD - 6. Remove cache files - 7. Find user with id(name) and id(uid) - 8. Find group with getent.group(name) and getent.group(gid) + 1. Lookup users :expectedresults: - 1. User is found - 2. User has correct uid - 3. Group is found - 4. Group has correct gid - 5. SSSD is stopped - 6. Cache files are removed - 7. User is not found - 8. Group is not found + 2. Users are found :customerscenario: True """ - provider.user("user1").add(uid=123456, gid=19001) - provider.group("group1").add(gid=10001) - - client.sssd.domain["ldap_id_mapping"] = "false" + script = client.fs.mktmp(""" +#!/usr/bin/env python3 +import argparse +import random +import string +import struct +import pysss_murmur + +class MemoryCache: + SIZE_OF_UINT32_T = 4 + + def __init__(self) -> None: + self.user = "user1" + self.cache_path = "/var/lib/sss/mc/passwd" + with open(self.cache_path, "rb") as f: + f.seek(16) + self.seed = struct.unpack("i", f.read(4))[0] + f.read(8) + hash_length = struct.unpack("i", f.read(4))[0] + self.hash_size = hash_length // 4 + + def generate_colliding_username(self, min_length=80): + target = pysss_murmur.murmurhash3(self.user + "\\0", len(self.user) + 1, self.seed) % self.hash_size + while True: + candidate = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(min_length)) + h = pysss_murmur.murmurhash3(candidate + "\\0", len(candidate) + 1, self.seed) % self.hash_size + if h == target: + return candidate + +if __name__ == "__main__": + mc = MemoryCache() + colliding = mc.generate_colliding_username(80) + print(colliding) +""") + + normal_user = "user1" + provider.user("user1").add() client.sssd.start() + client.tools.id(normal_user) - result = client.tools.id("user1") - assert result is not None, "User user1 is not found using id" - assert result.user.id == 123456, f"User id {result.user.id} is incorrect, 123456 expected" - - gresult = client.tools.getent.group("group1") - assert gresult is not None, "Group group1 is not found using getent" - assert gresult.gid == 10001, f"Group gid {gresult.gid} is incorrect, 10001 expected" + colliding_user = client.host.conn.run(f"python {script}").stdout + provider.user(colliding_user).add() + for user in [normal_user, colliding_user]: + result_user = client.tools.id(user) + assert result_user is not None, f"User '{user}' was not found!" client.sssd.stop() - r = client.host.conn.exec(["ls", "/var/lib/sss/mc"]) - for file in r.stdout.split(): - check = client.host.conn.exec(["rm", f"/var/lib/sss/mc/{file}"]) - assert check.rc == 0, "Cache file was not removed successfully" - - assert client.tools.id("user1") is None, "User user1 was found which is not expected" - assert client.tools.id(123456) is None, "User with id 123456 was found which is not expected" - assert client.tools.getent.group("group1") is None, "Group group1 was found which is not expected" - assert client.tools.getent.group(10001) is None, "Group with gid 10001 was found which is not expected" - - -@pytest.mark.topology(KnownTopology.LDAP) -@pytest.mark.ticket(bz=2226021) -def test_memcache__truncate_in_memory_cache_no_sigbus(client: Client, ldap: LDAP): - """ - :title: Accessing truncated in-memory cache file does not cause SIGBUS - :setup: - 1. Add 'user-1' to SSSD - 2. Start SSSD - :steps: - 1. Find 'user-1' so it is stored in in-memory cache - 2. Truncate /var/lib/sss/mc/passwd - 3. Check that 'user-1' can be correctly resolved - :expectedresults: - 1. User is found - 2. Size of /var/lib/sss/mc/passwd is 0 - 3. User can be found again and there is no crash - :customerscenario: True - """ - ldap.user("user-1").add() - - client.sssd.start() - - result = client.tools.id("user-1") - assert result is not None, "User was not found" - assert result.user.name == "user-1" - - client.fs.truncate("/var/lib/sss/mc/passwd") - - result = client.tools.id("user-1") - assert result is not None, "User was not found" - assert result.user.name == "user-1" + for user in [normal_user, colliding_user]: + result_user = client.tools.id(user) + assert result_user is not None, f"User '{user}' was not found!" From 3ab0a1e9a28f3d99977c6fd93ff976f09a576cd0 Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Tue, 17 Mar 2026 01:23:15 -0400 Subject: [PATCH 2/2] removing intg memcache tests --- src/tests/intg/Makefile.am | 1 - src/tests/intg/test_memory_cache.py | 1223 --------------------------- 2 files changed, 1224 deletions(-) delete mode 100644 src/tests/intg/test_memory_cache.py diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index 021e15bd9a8..4d3b6e67bcc 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -14,7 +14,6 @@ dist_noinst_DATA = \ ent.py \ ldap_ent.py \ util.py \ - test_memory_cache.py \ files_ops.py \ kdc.py \ krb5utils.py \ diff --git a/src/tests/intg/test_memory_cache.py b/src/tests/intg/test_memory_cache.py deleted file mode 100644 index dd8e565b8c9..00000000000 --- a/src/tests/intg/test_memory_cache.py +++ /dev/null @@ -1,1223 +0,0 @@ -# -# LDAP integration test -# -# Copyright (c) 2015 Red Hat, Inc. -# Author: Lukas Slebodnik -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -import os -import stat -import ent -import grp -import pwd -import config -import random -import signal -import string -import struct -import subprocess -import time -import pytest -import pysss_murmur - -import ds_openldap -import ldap_ent -import sssd_id -from util import unindent - -LDAP_BASE_DN = "dc=example,dc=com" - - -@pytest.fixture(scope="module") -def ds_inst(request): - """LDAP server instance fixture""" - ds_inst = ds_openldap.DSOpenLDAP( - config.PREFIX, 10389, LDAP_BASE_DN, - "cn=admin", "Secret123") - try: - ds_inst.setup() - except Exception: - ds_inst.teardown() - raise - request.addfinalizer(lambda: ds_inst.teardown()) - return ds_inst - - -@pytest.fixture(scope="module") -def ldap_conn(request, ds_inst): - """LDAP server connection fixture""" - ldap_conn = ds_inst.bind() - ldap_conn.ds_inst = ds_inst - request.addfinalizer(lambda: ldap_conn.unbind_s()) - return ldap_conn - - -def create_ldap_fixture(request, ldap_conn, ent_list): - """Add LDAP entries and add teardown for removing them""" - for entry in ent_list: - ldap_conn.add_s(entry[0], entry[1]) - - def teardown(): - for entry in ent_list: - ldap_conn.delete_s(entry[0]) - request.addfinalizer(teardown) - - -def create_conf_fixture(request, contents): - """Generate sssd.conf and add teardown for removing it""" - conf = open(config.CONF_PATH, "w") - conf.write(contents) - conf.close() - os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) - request.addfinalizer(lambda: os.unlink(config.CONF_PATH)) - - -def stop_sssd(): - pid_file = open(config.PIDFILE_PATH, "r") - pid = int(pid_file.read()) - os.kill(pid, signal.SIGTERM) - while True: - try: - os.kill(pid, signal.SIGCONT) - except OSError: - break - time.sleep(1) - - -def create_sssd_fixture(request): - """Start sssd and add teardown for stopping it and removing state""" - if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: - raise Exception("sssd start failed") - - def teardown(): - try: - stop_sssd() - except Exception: - pass - for path in os.listdir(config.DB_PATH): - os.unlink(config.DB_PATH + "/" + path) - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - # force sss_client libs to realize mem-cache files were deleted - try: - sssd_id.call_sssd_initgroups("user1", 2001) - except Exception: - pass - try: - grp.getgrnam("group1") - except Exception: - pass - try: - pwd.getpwnam("user1") - except Exception: - pass - request.addfinalizer(teardown) - - -def load_data_to_ldap(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user("user1", 1001, 2001) - ent_list.add_user("user2", 1002, 2002) - ent_list.add_user("user3", 1003, 2003) - ent_list.add_user("user11", 1011, 2001) - ent_list.add_user("user12", 1012, 2002) - ent_list.add_user("user13", 1013, 2003) - ent_list.add_user("user21", 1021, 2001) - ent_list.add_user("user22", 1022, 2002) - ent_list.add_user("user23", 1023, 2003) - - ent_list.add_group("group1", 2001, ["user1", "user11", "user21"]) - ent_list.add_group("group2", 2002, ["user2", "user12", "user22"]) - ent_list.add_group("group3", 2003, ["user3", "user13", "user23"]) - - ent_list.add_group("group0x", 2000, ["user1", "user2", "user3"]) - ent_list.add_group("group1x", 2010, ["user11", "user12", "user13"]) - ent_list.add_group("group2x", 2020, ["user21", "user22", "user23"]) - create_ldap_fixture(request, ldap_conn, ent_list) - - -@pytest.fixture -def disable_memcache_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - memcache_size_group = 0 - memcache_size_passwd = 0 - memcache_size_initgroups = 0 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def disable_pwd_mc_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - memcache_size_passwd = 0 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def disable_grp_mc_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - memcache_size_group = 0 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def disable_initgr_mc_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - memcache_size_initgroups = 0 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def sanity_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def fqname_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - use_fully_qualified_names = true - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def fqname_case_insensitive_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - use_fully_qualified_names = true - case_sensitive = false - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def zero_timeout_rfc2307(request, ldap_conn): - load_data_to_ldap(request, ldap_conn) - - conf = unindent("""\ - [sssd] - domains = LDAP - services = nss - - [nss] - memcache_timeout = 0 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - ldap_schema = rfc2307 - id_provider = ldap - auth_provider = ldap - sudo_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.mark.converted('test_id.py', 'test_id__getpwuid') -@pytest.mark.converted('test_id.py', 'test_id__getpwnam') -def test_getpwnam(ldap_conn, sanity_rfc2307): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user2', - dict(name='user2', passwd='*', uid=1002, gid=2002, - gecos='1002', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1002, - dict(name='user2', passwd='*', uid=1002, gid=2002, - gecos='1002', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user3', - dict(name='user3', passwd='*', uid=1003, gid=2003, - gecos='1003', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1003, - dict(name='user3', passwd='*', uid=1003, gid=2003, - gecos='1003', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user11', - dict(name='user11', passwd='*', uid=1011, gid=2001, - gecos='1011', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1011, - dict(name='user11', passwd='*', uid=1011, gid=2001, - gecos='1011', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user12', - dict(name='user12', passwd='*', uid=1012, gid=2002, - gecos='1012', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1012, - dict(name='user12', passwd='*', uid=1012, gid=2002, - gecos='1012', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user13', - dict(name='user13', passwd='*', uid=1013, gid=2003, - gecos='1013', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1013, - dict(name='user13', passwd='*', uid=1013, gid=2003, - gecos='1013', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user21', - dict(name='user21', passwd='*', uid=1021, gid=2001, - gecos='1021', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1021, - dict(name='user21', passwd='*', uid=1021, gid=2001, - gecos='1021', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user22', - dict(name='user22', passwd='*', uid=1022, gid=2002, - gecos='1022', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1022, - dict(name='user22', passwd='*', uid=1022, gid=2002, - gecos='1022', shell='/bin/bash')) - - ent.assert_passwd_by_name( - 'user23', - dict(name='user23', passwd='*', uid=1023, gid=2003, - gecos='1023', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1023, - dict(name='user23', passwd='*', uid=1023, gid=2003, - gecos='1023', shell='/bin/bash')) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam') -def test_getpwnam_with_mc(ldap_conn, sanity_rfc2307): - test_getpwnam(ldap_conn, sanity_rfc2307) - stop_sssd() - test_getpwnam(ldap_conn, sanity_rfc2307) - - -@pytest.mark.converted('test_id.py', 'test_id__getgrgid') -@pytest.mark.converted('test_id.py', 'test_id__getgrnam') -def test_getgrnam_simple(ldap_conn, sanity_rfc2307): - ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) - ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) - - ent.assert_group_by_name("group2", dict(name="group2", gid=2002)) - ent.assert_group_by_gid(2002, dict(name="group2", gid=2002)) - - ent.assert_group_by_name("group3", dict(name="group3", gid=2003)) - ent.assert_group_by_gid(2003, dict(name="group3", gid=2003)) - - ent.assert_group_by_name("group0x", dict(name="group0x", gid=2000)) - ent.assert_group_by_gid(2000, dict(name="group0x", gid=2000)) - - ent.assert_group_by_name("group1x", dict(name="group1x", gid=2010)) - ent.assert_group_by_gid(2010, dict(name="group1x", gid=2010)) - - ent.assert_group_by_name("group2x", dict(name="group2x", gid=2020)) - ent.assert_group_by_gid(2020, dict(name="group2x", gid=2020)) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getgrnam') -def test_getgrnam_simple_with_mc(ldap_conn, sanity_rfc2307): - test_getgrnam_simple(ldap_conn, sanity_rfc2307) - stop_sssd() - test_getgrnam_simple(ldap_conn, sanity_rfc2307) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getgrnam') -def test_getgrnam_simple_disabled_pwd_mc(ldap_conn, disable_pwd_mc_rfc2307): - test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307) - stop_sssd() - test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getgrnam') -def test_getgrnam_simple_disabled_intitgr_mc(ldap_conn, - disable_initgr_mc_rfc2307): - test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307) - stop_sssd() - test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307) - - -@pytest.mark.converted('test_id.py', 'test_id__membership_by_group_id') -@pytest.mark.converted('test_id.py', 'test_id__membership_by_group_name') -def test_getgrnam_membership(ldap_conn, sanity_rfc2307): - ent.assert_group_by_name( - "group1", - dict(mem=ent.contains_only("user1", "user11", "user21"))) - ent.assert_group_by_gid( - 2001, - dict(mem=ent.contains_only("user1", "user11", "user21"))) - - ent.assert_group_by_name( - "group2", - dict(mem=ent.contains_only("user2", "user12", "user22"))) - ent.assert_group_by_gid( - 2002, - dict(mem=ent.contains_only("user2", "user12", "user22"))) - - ent.assert_group_by_name( - "group3", - dict(mem=ent.contains_only("user3", "user13", "user23"))) - ent.assert_group_by_gid( - 2003, - dict(mem=ent.contains_only("user3", "user13", "user23"))) - - ent.assert_group_by_name( - "group0x", - dict(mem=ent.contains_only("user1", "user2", "user3"))) - ent.assert_group_by_gid( - 2000, - dict(mem=ent.contains_only("user1", "user2", "user3"))) - - ent.assert_group_by_name( - "group1x", - dict(mem=ent.contains_only("user11", "user12", "user13"))) - ent.assert_group_by_gid( - 2010, - dict(mem=ent.contains_only("user11", "user12", "user13"))) - - ent.assert_group_by_name( - "group2x", - dict(mem=ent.contains_only("user21", "user22", "user23"))) - ent.assert_group_by_gid( - 2020, - dict(mem=ent.contains_only("user21", "user22", "user23"))) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_id') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_name') -def test_getgrnam_membership_with_mc(ldap_conn, sanity_rfc2307): - test_getgrnam_membership(ldap_conn, sanity_rfc2307) - stop_sssd() - test_getgrnam_membership(ldap_conn, sanity_rfc2307) - - -def assert_user_gids_equal(user, expected_gids): - (res, errno, gids) = sssd_id.get_user_gids(user) - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user %s, %d" % (user, errno) - - assert sorted(gids) == sorted(expected_gids), \ - "result: %s\n expected %s" % ( - ", ".join(["%s" % s for s in sorted(gids)]), - ", ".join(["%s" % s for s in sorted(expected_gids)]) - ) - - -@pytest.mark.converted('test_id.py', 'test_id__initgroups') -def test_initgroups(ldap_conn, sanity_rfc2307): - assert_user_gids_equal('user1', [2000, 2001]) - assert_user_gids_equal('user2', [2000, 2002]) - assert_user_gids_equal('user3', [2000, 2003]) - - assert_user_gids_equal('user11', [2010, 2001]) - assert_user_gids_equal('user12', [2010, 2002]) - assert_user_gids_equal('user13', [2010, 2003]) - - assert_user_gids_equal('user21', [2020, 2001]) - assert_user_gids_equal('user22', [2020, 2002]) - assert_user_gids_equal('user23', [2020, 2003]) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__user_gids') -def test_initgroups_with_mc(ldap_conn, sanity_rfc2307): - test_initgroups(ldap_conn, sanity_rfc2307) - stop_sssd() - test_initgroups(ldap_conn, sanity_rfc2307) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam_fully_qualified_names') -@pytest.mark.converted('test_id.py', 'test_id__getpwnam_fully_qualified_names') -def test_initgroups_fqname_with_mc(ldap_conn, fqname_rfc2307): - assert_user_gids_equal('user1@LDAP', [2000, 2001]) - stop_sssd() - assert_user_gids_equal('user1@LDAP', [2000, 2001]) - - -def assert_initgroups_equal(user, primary_gid, expected_gids): - (res, errno, gids) = sssd_id.call_sssd_initgroups(user, primary_gid) - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user %s, %d" % (user, errno) - - assert sorted(gids) == sorted(expected_gids), \ - "result: %s\n expected %s" % ( - ", ".join(["%s" % s for s in sorted(gids)]), - ", ".join(["%s" % s for s in sorted(expected_gids)]) - ) - - -def assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, - primary_gid, expected_gids): - - assert_initgroups_equal(user1_case1, primary_gid, expected_gids) - assert_initgroups_equal(user1_case2, primary_gid, expected_gids) - assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) - stop_sssd() - - user = user1_case1 - (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) - assert res == sssd_id.NssReturnCode.UNAVAIL, \ - "Initgroups for user should fail user %s, %d, %d" % (user, res, errno) - - user = user1_case2 - (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) - assert res == sssd_id.NssReturnCode.UNAVAIL, \ - "Initgroups for user should fail user %s, %d, %d" % (user, res, errno) - - # Just last invocation of initgroups should PASS - # Otherwise, we would not be able to invalidate it - assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) - - -@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') -@pytest.mark.converted('test_id.py', 'test_id__case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') -def test_initgroups_case_insensitive_with_mc1(ldap_conn, - fqname_case_insensitive_rfc2307): - user1_case1 = 'User1@LDAP' - user1_case2 = 'uSer1@LDAP' - user1_case_last = 'usEr1@LDAP' - primary_gid = 2001 - expected_gids = [2000, 2001] - - assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, - primary_gid, expected_gids) - - -@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') -@pytest.mark.converted('test_id.py', 'test_id__case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') -def test_initgroups_case_insensitive_with_mc2(ldap_conn, - fqname_case_insensitive_rfc2307): - user1_case1 = 'usEr1@LDAP' - user1_case2 = 'User1@LDAP' - user1_case_last = 'uSer1@LDAP' - primary_gid = 2001 - expected_gids = [2000, 2001] - - assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, - primary_gid, expected_gids) - - -@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') -@pytest.mark.converted('test_id.py', 'test_id__case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') -def test_initgroups_case_insensitive_with_mc3(ldap_conn, - fqname_case_insensitive_rfc2307): - user1_case1 = 'uSer1@LDAP' - user1_case2 = 'usEr1@LDAP' - user1_case_last = 'User1@LDAP' - primary_gid = 2001 - expected_gids = [2000, 2001] - - assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, - primary_gid, expected_gids) - - -def run_simple_test_with_initgroups(): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name( - "group1", - dict(mem=ent.contains_only("user1", "user11", "user21"))) - ent.assert_group_by_gid( - 2001, - dict(mem=ent.contains_only("user1", "user11", "user21"))) - - # unrelated group to user1 - ent.assert_group_by_name( - "group2", - dict(mem=ent.contains_only("user2", "user12", "user22"))) - ent.assert_group_by_gid( - 2002, - dict(mem=ent.contains_only("user2", "user12", "user22"))) - - assert_initgroups_equal("user1", 2001, [2000, 2001]) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidatation_of_gids_after_initgroups') -def test_invalidation_of_gids_after_initgroups(ldap_conn, sanity_rfc2307): - - # the sssd cache was empty and not all user's group were - # resolved with getgr{nm,gid}. Therefore there is a change in - # group membership => user groups should be invalidated - run_simple_test_with_initgroups() - assert_initgroups_equal("user1", 2001, [2000, 2001]) - - stop_sssd() - - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - # unrelated group to user1 must be returned - ent.assert_group_by_name( - "group2", - dict(mem=ent.contains_only("user2", "user12", "user22"))) - ent.assert_group_by_gid( - 2002, - dict(mem=ent.contains_only("user2", "user12", "user22"))) - - assert_initgroups_equal("user1", 2001, [2000, 2001]) - - # user groups must be invalidated - for group in ["group1", "group0x"]: - with pytest.raises(KeyError): - grp.getgrnam(group) - - for gid in [2000, 2001]: - with pytest.raises(KeyError): - grp.getgrgid(gid) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__initgroups_without_change_in_membership') -def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307): - - # the sssd cache was empty and not all user's group were - # resolved with getgr{nm,gid}. Therefore there is a change in - # group membership => user groups should be invalidated - run_simple_test_with_initgroups() - - # invalidate cache - subprocess.call(["sss_cache", "-E"]) - - # all users and groups will be just refreshed from LDAP - # but there will not be a change in group membership - # user groups should not be invlaidated - run_simple_test_with_initgroups() - - stop_sssd() - - # everything should be in memory cache - run_simple_test_with_initgroups() - - -def assert_mc_records_for_user1(): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name( - "group1", - dict(mem=ent.contains_only("user1", "user11", "user21"))) - ent.assert_group_by_gid( - 2001, - dict(mem=ent.contains_only("user1", "user11", "user21"))) - ent.assert_group_by_name( - "group0x", - dict(mem=ent.contains_only("user1", "user2", "user3"))) - ent.assert_group_by_gid( - 2000, - dict(mem=ent.contains_only("user1", "user2", "user3"))) - - assert_initgroups_equal("user1", 2001, [2000, 2001]) - - -def assert_missing_mc_records_for_user1(): - with pytest.raises(KeyError): - pwd.getpwnam("user1") - with pytest.raises(KeyError): - pwd.getpwuid(1001) - - for gid in [2000, 2001]: - with pytest.raises(KeyError): - grp.getgrgid(gid) - for group in ["group0x", "group1"]: - with pytest.raises(KeyError): - grp.getgrnam(group) - - (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001) - assert res == sssd_id.NssReturnCode.UNAVAIL, \ - "Initgroups should not find anything after invalidation of mc.\n" \ - "User user1, errno:%d" % err - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_before_stop') -def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - subprocess.call(["sss_cache", "-u", "user1"]) - stop_sssd() - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_after_stop') -def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - stop_sssd() - subprocess.call(["sss_cache", "-u", "user1"]) - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_before_stop') -def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - subprocess.call(["sss_cache", "-U"]) - stop_sssd() - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_after_stop') -def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - stop_sssd() - subprocess.call(["sss_cache", "-U"]) - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_before_stop') -def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - subprocess.call(["sss_cache", "-g", "group1"]) - stop_sssd() - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_after_stop') -def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - stop_sssd() - subprocess.call(["sss_cache", "-g", "group1"]) - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_before_stop') -def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - subprocess.call(["sss_cache", "-G"]) - stop_sssd() - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_after_stop') -def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - stop_sssd() - subprocess.call(["sss_cache", "-G"]) - - assert_missing_mc_records_for_user1() - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_before_stop') -def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - subprocess.call(["sss_cache", "-E"]) - stop_sssd() - - assert_missing_mc_records_for_user1() - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_after_stop') -def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307): - # initialize cache with full ID - (res, errno, _) = sssd_id.get_user_groups("user1") - assert res == sssd_id.NssReturnCode.SUCCESS, \ - "Could not find groups for user1, %d" % errno - assert_mc_records_for_user1() - - stop_sssd() - subprocess.call(["sss_cache", "-E"]) - - assert_missing_mc_records_for_user1() - - -def get_random_string(length): - return ''.join([random.choice(string.ascii_letters + string.digits) - for n in range(length)]) - - -class MemoryCache(object): - SIZEOF_UINT32_T = 4 - - def __init__(self, path): - with open(path, "rb") as fin: - fin.seek(4 * self.SIZEOF_UINT32_T) - self.seed = struct.unpack('i', fin.read(4))[0] - self.data_size = struct.unpack('i', fin.read(4))[0] - self.ft_size = struct.unpack('i', fin.read(4))[0] - hash_len = struct.unpack('i', fin.read(4))[0] - self.hash_size = hash_len / self.SIZEOF_UINT32_T - - def sss_nss_mc_hash(self, key): - input_key = key + '\0' - input_len = len(key) + 1 - - murmur_hash = pysss_murmur.murmurhash3(input_key, input_len, self.seed) - return murmur_hash % self.hash_size - - -def test_colliding_hashes(ldap_conn, sanity_rfc2307): - """ - Regression test for ticket: - https://github.com/SSSD/sssd/issues/4595 - """ - - first_user = 'user1' - - # initialize data in memcache - ent.assert_passwd_by_name( - first_user, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - mem_cache = MemoryCache(config.MCACHE_PATH + '/passwd') - - colliding_hash = mem_cache.sss_nss_mc_hash(first_user) - - while True: - # string for colliding hash need to be longer then data for user1 - # stored in memory cache (almost equivalent to: - # `getent passwd user1 | wc -c` ==> 45 - second_user = get_random_string(80) - val = mem_cache.sss_nss_mc_hash(second_user) - if val == colliding_hash: - break - - # add new user to LDAP - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user(second_user, 5001, 5001) - ldap_conn.add_s(ent_list[0][0], ent_list[0][1]) - - ent.assert_passwd_by_name( - second_user, - dict(name=second_user, passwd='*', uid=5001, gid=5001, - gecos='5001', shell='/bin/bash')) - - stop_sssd() - - # check that both users are stored in cache - ent.assert_passwd_by_name( - first_user, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_passwd_by_name( - second_user, - dict(name=second_user, passwd='*', uid=5001, gid=5001, - gecos='5001', shell='/bin/bash')) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__removed_cache_without_invalidation') -def test_removed_mc(ldap_conn, sanity_rfc2307): - """ - Regression test for ticket: - https://fedorahosted.org/sssd/ticket/2726 - """ - - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) - ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) - stop_sssd() - - # remove cache without invalidation - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - - # sssd is stopped; so the memory cache should not be used - # in long living clients (py.test in this case) - with pytest.raises(KeyError): - pwd.getpwnam('user1') - with pytest.raises(KeyError): - pwd.getpwuid(1001) - - with pytest.raises(KeyError): - grp.getgrnam('group1') - with pytest.raises(KeyError): - grp.getgrgid(2001) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__memcache_timeout_zero') -def test_mc_zero_timeout(ldap_conn, zero_timeout_rfc2307): - """ - Test that the memory cache is not created at all with memcache_timeout=0 - """ - # No memory cache files must be created - assert len(os.listdir(config.MCACHE_PATH)) == 0 - - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) - ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) - stop_sssd() - - # sssd is stopped; so the memory cache should not be used - # in long living clients (py.test in this case) - with pytest.raises(KeyError): - pwd.getpwnam('user1') - with pytest.raises(KeyError): - pwd.getpwuid(1001) - - with pytest.raises(KeyError): - grp.getgrnam('group1') - with pytest.raises(KeyError): - grp.getgrgid(2001) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_cache') -def test_disabled_mc(ldap_conn, disable_memcache_rfc2307): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) - ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) - - assert_user_gids_equal('user1', [2000, 2001]) - - stop_sssd() - - # sssd is stopped and the memory cache is disabled; - # so pytest should not be able to find anything - with pytest.raises(KeyError): - pwd.getpwnam('user1') - with pytest.raises(KeyError): - pwd.getpwuid(1001) - - with pytest.raises(KeyError): - grp.getgrnam('group1') - with pytest.raises(KeyError): - grp.getgrgid(2001) - - with pytest.raises(KeyError): - (res, errno, gids) = sssd_id.get_user_gids('user1') - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getpwnam') -def test_disabled_passwd_mc(ldap_conn, disable_pwd_mc_rfc2307): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - assert_user_gids_equal('user1', [2000, 2001]) - - stop_sssd() - - # passwd cache is disabled - with pytest.raises(KeyError): - pwd.getpwnam('user1') - with pytest.raises(KeyError): - pwd.getpwuid(1001) - - # Initgroups looks up the user first, hence KeyError from the - # passwd database even if the initgroups cache is active. - with pytest.raises(KeyError): - (res, errno, gids) = sssd_id.get_user_gids('user1') - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_group') -def test_disabled_group_mc(ldap_conn, disable_grp_mc_rfc2307): - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) - ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) - - assert_user_gids_equal('user1', [2000, 2001]) - - stop_sssd() - - # group cache is disabled, other caches should work - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - with pytest.raises(KeyError): - grp.getgrnam('group1') - with pytest.raises(KeyError): - grp.getgrgid(2001) - - assert_user_gids_equal('user1', [2000, 2001]) - - -@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getpwnam') -def test_disabled_initgr_mc(ldap_conn, disable_initgr_mc_rfc2307): - # Even if initgroups is disabled, passwd should work - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - - stop_sssd() - - ent.assert_passwd_by_name( - 'user1', - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash')) - ent.assert_passwd_by_uid( - 1001, - dict(name='user1', passwd='*', uid=1001, gid=2001, - gecos='1001', shell='/bin/bash'))