diff --git a/sssd_test_framework/topology.py b/sssd_test_framework/topology.py index 4be97afc..b5b6d6c8 100644 --- a/sssd_test_framework/topology.py +++ b/sssd_test_framework/topology.py @@ -66,6 +66,7 @@ def test_ldap(client: Client, ldap: LDAP): name="gdm", topology=Topology(TopologyDomain("sssd", client=1, ipa=1, keycloak=1)), controller=GDMTopologyController(), + domains=dict(test="sssd.ipa[0]"), fixtures=dict(client="sssd.client[0]", ipa="sssd.ipa[0]", provider="sssd.ipa[0]", keycloak="sssd.keycloak[0]"), ) """ diff --git a/sssd_test_framework/topology_controllers.py b/sssd_test_framework/topology_controllers.py index cb0eb656..574e2f29 100644 --- a/sssd_test_framework/topology_controllers.py +++ b/sssd_test_framework/topology_controllers.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re import socket from pytest_mh import BackupTopologyController @@ -58,6 +59,54 @@ def teardown(self) -> None: super().teardown() + def join_domain(self, client: ClientHost, provider: IPAHost | ADHost | SambaHost): + """ + Helper method for joining domains + """ + self.logger.info(f"Enrolling {client.hostname} into {provider.domain}") + + # Remove any existing Kerberos configuration and keytab + client.fs.rm("/etc/krb5.conf") + client.fs.rm("/etc/krb5.keytab") + + # First check if joined. If so, leave. + result = client.conn.exec(["realm", "list"]) + self.logger.info(f"REALM_LIST STDOUT: \n{result.stdout}") + self.logger.info(f"REALM_LIST STDERR: \n{result.stderr}") + pattern = rf"^{re.escape(provider.domain)}$" + if re.search(pattern, result.stdout, re.MULTILINE): + self.logger.info(f"Found {provider.domain} in realm list. Leaving.") + result = client.conn.exec( + ["realm", "leave", provider.domain], input=provider.adminpw, raise_on_error=False + ) + self.logger.info(f"Optional realm leave failed...continuing to join {provider.domain}.") + + if isinstance(provider, (IPAHost)): + # Backup ipa-client-install files + client.fs.backup("/etc/ipa") + client.fs.backup("/var/lib/ipa-client") + + # Configure realm to keep kerberos intact + client.fs.backup("/etc/realmd.conf") + client.fs.write( + "/etc/realmd.conf", + """ + [service] + manage-krb5-conf = no + """, + ) + + # Join provider domain + result = client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw, raise_on_error=False) + if result.rc != 0: + self.logger.info(f"Running realm join failed with:\n{result.stdout}\n{result.stderr}") + self.logger.info("Trying uninstall and join again.") + if isinstance(provider, (IPAHost)): + client.conn.exec(["ipa-client-install", "--uninstall", "-U"]) + else: + client.conn.exec(["realm", "leave", "--unattended", provider.domain], input=provider.adminpw) + client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw) + class ClientTopologyController(ProvisionedBackupTopologyController): """ @@ -86,18 +135,8 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost) -> None: self.logger.info(f"Topology '{self.name}' is already provisioned") return - self.logger.info(f"Enrolling {client.hostname} into {ipa.domain}") - - # Remove any existing Kerberos configuration and keytab - client.fs.rm("/etc/krb5.conf") - client.fs.rm("/etc/krb5.keytab") - - # Backup ipa-client-install files - client.fs.backup("/etc/ipa") - client.fs.backup("/var/lib/ipa-client") - - # Join ipa domain - client.conn.exec(["realm", "join", ipa.domain], input=ipa.adminpw) + # Join IPA domain + self.join_domain(client, ipa) # Backup so we can restore to this state after each test super().topology_setup() @@ -114,14 +153,8 @@ def topology_setup(self, client: ClientHost, provider: ADHost | SambaHost) -> No self.logger.info(f"Topology '{self.name}' is already provisioned") return - self.logger.info(f"Enrolling {client.hostname} into {provider.domain}") - - # Remove any existing Kerberos configuration and keytab - client.fs.rm("/etc/krb5.conf") - client.fs.rm("/etc/krb5.keytab") - - # Join AD domain - client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw) + # Join AD/Samba domain + self.join_domain(client, provider) # Backup so we can restore to this state after each test super().topology_setup() @@ -156,18 +189,8 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost, trusted: ADHost | Sam # Do not enroll client into IPA domain if it is already joined if "ipa" not in self.multihost.provisioned_topologies: - self.logger.info(f"Enrolling {client.hostname} into {ipa.domain}") - - # Remove any existing Kerberos configuration and keytab - client.fs.rm("/etc/krb5.conf") - client.fs.rm("/etc/krb5.keytab") - - # Backup ipa-client-install files - client.fs.backup("/etc/ipa") - client.fs.backup("/var/lib/ipa-client") - - # Join IPA domain) - client.conn.exec(["realm", "join", ipa.domain], input=ipa.adminpw) + # Join IPA domain + self.join_domain(client, ipa) # Backup so we can restore to this state after each test super().topology_setup() @@ -284,12 +307,19 @@ class GDMTopologyController(ProvisionedBackupTopologyController): @BackupTopologyController.restore_vanilla_on_error def topology_setup(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHost) -> None: + if "gdm" not in client.features or not client.features["gdm"]: + self.logger.info(f"Topology '{self.name}' setup skipped because gdm feature not found on client") + return + if self.provisioned: self.logger.info(f"Topology '{self.name}' is already provisioned") return self.logger.info(f"Enrolling IPA server {ipa.hostname} into {keycloak.hostname} by creating an IdP client") + # Join IPA domain + self.join_domain(client, ipa) + # Create an IdP client keycloak.kclogin() keycloak.conn.run( @@ -311,13 +341,18 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHos # Backup so we can restore to this state after each test super().topology_setup() - def topology_teardown(self, ipa: IPAHost, keycloak: KeycloakHost) -> None: + def topology_teardown(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHost) -> None: + if "gdm" not in client.features or not client.features["gdm"]: + self.logger.info(f"Topology '{self.name}' teardown skipped because gdm feature not found on client") + return + self.logger.info(f"Un-enrolling IPA server {ipa.hostname} from {keycloak.hostname} by deleting the IdP client") keycloak.kclogin() keycloak.conn.run( "ID=$(/opt/keycloak/bin/kcadm.sh get clients -q clientId=ipa_oidc_client --fields=id|jq -r '.[0].id'); " "/opt/keycloak/bin/kcadm.sh delete clients/$ID" ) + ipa.kinit() ipa.conn.run("ipa idp-del keycloak") super().topology_teardown() diff --git a/sssd_test_framework/utils/gdm.py b/sssd_test_framework/utils/gdm.py index 9a6b32d8..3242384e 100644 --- a/sssd_test_framework/utils/gdm.py +++ b/sssd_test_framework/utils/gdm.py @@ -30,7 +30,7 @@ def __init__(self, host): super().__init__(host) self.init_completed = False - self.cmd = [scauto_path, "gui", "--wait-time", "1", "--no-screenshot"] + self.cmd = [scauto_path, "--verbose", "debug", "gui", "--wait-time", "1", "--no-screenshot"] def teardown(self): if self.init_completed: @@ -56,10 +56,13 @@ def assert_text(self, word: str) -> bool: if not self.init_completed: self.init() - result = self.host.conn.exec([*self.cmd, "assert-text", word]) + result = self.host.conn.exec([*self.cmd, "assert-text", word], raise_on_error=False) + if result.rc != 0: + self.logger.warn(f"Unable to find text ({word}) on screen") return result.rc == 0 - def click_on(self, word: str) -> bool: + @retry(max_retries=3, delay=1, on=AssertionError) + def click_on(self, word: str) -> None: """ Run scauto gui click-on @@ -71,8 +74,9 @@ def click_on(self, word: str) -> bool: if not self.init_completed: self.init() - result = self.host.conn.exec([*self.cmd, "click-on", word]) - return result.rc == 0 + result = self.host.conn.exec([*self.cmd, "click-on", word], raise_on_error=False) + if result.rc != 0: + raise AssertionError(f"Unable to click-on target {word}") def kb_write(self, word: str) -> bool: """