diff --git a/src/drunc/data/process_manager/k8s-CERN.json b/src/drunc/data/process_manager/k8s-CERN.json index 7b9806005..cf01c415a 100644 --- a/src/drunc/data/process_manager/k8s-CERN.json +++ b/src/drunc/data/process_manager/k8s-CERN.json @@ -23,5 +23,43 @@ "opmon_conf":{ "level": "info", "interval_s" : 10.0 + }, + "settings": { + "labels": { + "drunc_label": "drunc.daq" + }, + "connection_server": { + "name": "local-connection-server" + }, + "pod_management": { + "kill_timeout": 30, + "total_shutdown_timeout": 60, + "pod_ready_timeout": 60 + }, + "cleanup": { + "restart_cleanup_time": 10, + "restart_cleanup_polling": 0.5 + }, + "checking": { + "watcher_retry_sleep": 5, + "pod_status_check_sleep": 1, + "host_cache_expiry": 300, + "grpc_startup_timeout": 30, + "socket_retry_timeout": 1.0 + }, + "volumes": [ + { + "name": "nfs", + "mount_path": "/nfs", + "host_path": "/nfs", + "read_only": false + }, + { + "name": "cvmfs", + "mount_path": "/cvmfs", + "host_path": "/cvmfs", + "read_only": true + } + ] } } diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 2fedaf06c..1718ffe16 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -32,7 +32,8 @@ "name": "local-connection-server" }, "pod_management": { - "kill_timeout": 20, + "kill_timeout": 30, + "total_shutdown_timeout": 60, "pod_ready_timeout": 60 }, "cleanup": { @@ -45,6 +46,20 @@ "host_cache_expiry": 300, "grpc_startup_timeout": 30, "socket_retry_timeout": 1.0 - } + }, + "volumes": [ + { + "name": "nfs", + "mount_path": "/nfs", + "host_path": "/nfs", + "read_only": false + }, + { + "name": "cvmfs", + "mount_path": "/cvmfs", + "host_path": "/cvmfs", + "read_only": true + } + ] } } diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index ee1feb1bb..bec6f1dac 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -27,6 +27,7 @@ # Third-Party Imports from kubernetes import client, config, watch from kubernetes.client.rest import ApiException +from kubernetes.config.config_exception import ConfigException from drunc.k8s_exceptions import ( DruncK8sException, @@ -141,7 +142,16 @@ def __init__(self, configuration, **kwargs) -> None: super().__init__(configuration=configuration, session=self.session, **kwargs) self.log = get_logger("process_manager.k8s-process-manager") - config.load_kube_config() + try: + config.load_kube_config() + except ConfigException as e: + self.log.critical("--- 🚨 KUBERNETES CONFIGURATION ERROR ---") + self.log.critical(f"Failed to load kube-config: {e}") + self.log.critical( + "Please ensure 'kubectl' is configured correctly or the KUBECONFIG environment variable is set." + ) + self.log.critical("----------------------------------------------") + raise self._k8s_client = client self._core_v1_api = client.CoreV1Api() @@ -179,8 +189,12 @@ def __init__(self, configuration, **kwargs) -> None: # Pod management pod_management = settings.get("pod_management", {}) - self.kill_timeout = pod_management.get("kill_timeout", 20) + self.kill_timeout = pod_management.get("kill_timeout", 30) self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60) + self.total_shutdown_timeout = pod_management.get("total_shutdown_timeout", 60) + + # Volume mounts + self.volume_configs = settings.get("volumes", []) # Cleanup cleanup = settings.get("cleanup", {}) @@ -512,8 +526,138 @@ def _create_nodeport_service(self, podname, session, pod_uid) -> None: self.log.error(error_message) raise DruncK8sException(error_message) from e + def _get_pod_volumes_and_mounts( + self, boot_request: BootRequest + ) -> tuple[list[client.V1Volume], list[client.V1VolumeMount]]: + """ + Prepares all pod volumes and container mounts, including static + configs and the dynamic data_mount. + """ + pod_volumes = [] + container_volume_mounts = [] + + # Volumes from json configuration + for vc in self.volume_configs: + pod_volumes.append( + client.V1Volume( + name=vc["name"], + host_path=client.V1HostPathVolumeSource( + path=vc["host_path"], type="Directory" + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name=vc["name"], + mount_path=vc["mount_path"], + read_only=vc.get("read_only", True), + ) + ) + + # Add log_mount from process_logs_path + log_dir = None + log_file_path = boot_request.process_description.process_logs_path + if log_file_path: + log_dir = os.path.dirname(log_file_path) + self.log.info(f"Adding 'log-mount' for directory: '{log_dir}'") + + pod_volumes.append( + client.V1Volume( + name="log-mount", + host_path=client.V1HostPathVolumeSource( + path=log_dir, + type="DirectoryOrCreate", + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name="log-mount", + mount_path=log_dir, + read_only=False, + ) + ) + + # Add dynamic data_mount if present in the boot request + data_mount_path = None + if boot_request.process_restriction.data_mount: + self.log.info( + f"Found data_mount request: '{boot_request.process_restriction.data_mount}'" + ) + if boot_request.process_restriction.data_mount == ".": + data_mount_path = ( + boot_request.process_description.process_execution_directory + ) + self.log.info( + f"Resolving '.' data_mount to process_execution_directory: '{data_mount_path}'" + ) + else: + data_mount_path = boot_request.process_restriction.data_mount + self.log.info(f"Using provided data_mount path: '{data_mount_path}'") + + if data_mount_path: + if data_mount_path == log_dir: + self.log.info( + f"Skipping 'data-mount' as its path '{data_mount_path}' is already covered by 'log-mount'." + ) + else: + self.log.info( + f"Adding 'data-mount' for directory: '{data_mount_path}'" + ) + pod_volumes.append( + client.V1Volume( + name="data-mount", + host_path=client.V1HostPathVolumeSource( + path=data_mount_path, + type="Directory", + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name="data-mount", + mount_path=data_mount_path, + read_only=False, + ) + ) + + return pod_volumes, container_volume_mounts + + def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: + """ + Determines the role of a pod based on its tree_id, + and returns a dictionary of labels to be applied. + """ + role = "unknown" + + labels = {f"tree-id.{self.drunc_label}": tree_id} + + if not tree_id: + role = "unknown" + elif tree_id == "0": + role = "root-controller" + elif tree_id == "1": + role = "local-connection-server" + else: + # Count the depth + depth = tree_id.count(".") + if depth == 1: + role = "segment-controller" + elif depth == 2: + role = "application" + + labels[f"role.{self.drunc_label}"] = role + self.log.info( + f"Assigning labels for '{podname}': role={role}, tree-id={tree_id}" + ) + return labels + def _build_pod_main_container( - self, podname: str, boot_request: BootRequest, lcs_port: int | None + self, + podname: str, + boot_request: BootRequest, + lcs_port: int | None, + container_volume_mounts: list[client.V1VolumeMount], ) -> client.V1Container: """Builds the primary V1Container manifest, including command and preStop hook.""" @@ -525,17 +669,16 @@ def _build_pod_main_container( for i, e_and_a in enumerate(exec_and_args_list): is_last_command = i == len(exec_and_args_list) - 1 prefix = "" - # Only add 'exec' to the C++ apps (non-controllers) + if ( - "controller" not in podname - and podname != self.connection_server_name - and is_last_command + is_last_command and e_and_a.exec != "source" + and podname != self.connection_server_name ): prefix = "exec " command_parts.append(prefix + " ".join([e_and_a.exec] + list(e_and_a.args))) - main_command_str = " && ".join(command_parts) + main_command_chain = " && ".join(command_parts) container_ports = [] if podname == self.connection_server_name and lcs_port is not None: @@ -562,21 +705,40 @@ def _build_pod_main_container( f"'{podname}' identified as a Python app, no preStop hook needed." ) + # Redirect logs + log_file_path = boot_request.process_description.process_logs_path + final_command_args: str + + if log_file_path: + self.log.info(f"Redirecting pod stdout/stderr to '{log_file_path}'") + log_redirect_cmd = f"exec > {log_file_path} 2>&1;" + else: + log_redirect_cmd = "" + + if podname == self.connection_server_name: + # LCS (gunicorn) needs a shell trap to handle SIGTERM grace + final_command_args = ( + f"{log_redirect_cmd} " + f"trap 'kill -KILL $child; wait $child; exit 0' TERM QUIT; " + f"{main_command_chain} & " + f"child=$!; " + f"wait $child" + ) + else: + final_command_args = f"{log_redirect_cmd} {main_command_chain}" + main_container = client.V1Container( name=podname, image=pod_image, command=["/bin/sh", "-c"], - args=[main_command_str], + args=[final_command_args], env=[ client.V1EnvVar(name=k, value=v) for k, v in boot_request.process_description.env.items() ], lifecycle=lifecycle_hook, ports=container_ports, - volume_mounts=[ - client.V1VolumeMount(name="nfs", mount_path="/nfs"), - client.V1VolumeMount(name="cvmfs", mount_path="/cvmfs"), - ], + volume_mounts=container_volume_mounts, working_dir=boot_request.process_description.process_execution_directory, security_context=client.V1SecurityContext( run_as_user=os.getuid(), run_as_group=os.getgid() @@ -644,18 +806,26 @@ def _build_pod_manifest( main_container: client.V1Container, node_selector: dict, host_aliases: list[client.V1HostAlias] | None, + pod_volumes: list[client.V1Volume], + extra_labels: dict[str, str] | None = None, ) -> client.V1Pod: """Assembles the final V1Pod object.""" + + # Get pod labels + pod_labels = { + "app": podname, + f"creator.{self.drunc_label}": self.__class__.__name__, + } + if extra_labels: + pod_labels.update(extra_labels) + return client.V1Pod( api_version="v1", kind="Pod", metadata=self._meta_v1_api( name=podname, namespace=session, - labels={ - "app": podname, - f"creator.{self.drunc_label}": self.__class__.__name__, - }, + labels=pod_labels, ), spec=self._pod_spec_v1_api( node_selector=node_selector, @@ -663,15 +833,7 @@ def _build_pod_manifest( restart_policy="Never", containers=[main_container], host_aliases=host_aliases if host_aliases else None, - volumes=[ - client.V1Volume( - name="nfs", host_path=client.V1HostPathVolumeSource(path="/nfs") - ), - client.V1Volume( - name="cvmfs", - host_path=client.V1HostPathVolumeSource(path="/cvmfs"), - ), - ], + volumes=pod_volumes, ), ) @@ -748,6 +910,7 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: """Constructs and creates a Kubernetes Pod manifest and its associated service.""" try: lcs_port = None + tree_id = boot_request.process_description.metadata.tree_id # Early Port Extraction and Class Variable Setup for LCS if podname == self.connection_server_name: @@ -760,9 +923,21 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: f"Could not extract port for LCS '{podname}'." ) + # Get correct label using tree_id + tree_labels = self._get_tree_labels(tree_id, podname) + + # Prepare volume mounts + ( + pod_volumes, + container_volume_mounts, + ) = self._get_pod_volumes_and_mounts(boot_request) + # Build the main container manifest main_container = self._build_pod_main_container( - podname, boot_request, lcs_port + podname, + boot_request, + lcs_port, + container_volume_mounts, ) # Node_selector, host_aliases, pod_manifest @@ -776,6 +951,8 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: main_container, node_selector, host_aliases, + pod_volumes, + extra_labels=tree_labels, ) # Execute the pod creation API call @@ -1315,20 +1492,30 @@ def _kill_pod(self, podname, session, grace_period_seconds=None) -> None: ) def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: - """Handles the 'kill' command.""" - uuids_to_kill = self._get_process_uid(query, order_by="leaf_first") - if not uuids_to_kill: - return ProcessInstanceList(values=[]) + """ + Handles the 'kill' command with staged, role-based shutdown + by querying pod labels. + """ - self.log.info(f"Starting termination of {len(uuids_to_kill)} pods...") + # Get all UUIDs + targeted_uuids = set(self._get_process_uid(query)) + if not targeted_uuids: + return ProcessInstanceList(values=[]) - apps = [] - for uuid_str in uuids_to_kill: - if uuid_str not in self.boot_request: - continue + self.log.info( + f"Starting staged termination for {len(targeted_uuids)} pod(s)..." + ) - apps.append(uuid_str) + # Define the shutdown order + shutdown_order = [ + "unknown", + "application", + "segment-controller", + "root-controller", + "local-connection-server", + ] + # Define the blocking kill_and_wait helper def kill_and_wait(uuids, grace_period=None) -> None: if not uuids: return @@ -1363,10 +1550,47 @@ def kill_and_wait(uuids, grace_period=None) -> None: self.uuids_pending_deletion.clear() - kill_and_wait(apps) + # Execute staged shutdown + all_pods = [] + try: + pod_list = self._core_v1_api.list_pod_for_all_namespaces( + label_selector=self._get_creator_label_selector() + ) + all_pods = pod_list.items + except self._api_error_v1_api as e: + self.log.error(f"Could not list pods for kill operation: {e}") + + # Map pods by their role label + pods_by_role = { + "unknown": [], + "application": [], + "segment-controller": [], + "root-controller": [], + "local-connection-server": [], + } + + uuid_label_key = f"uuid.{self.drunc_label}" + role_label_key = f"role.{self.drunc_label}" + + for pod in all_pods: + uuid = pod.metadata.labels.get(uuid_label_key) + if uuid and uuid in targeted_uuids: + role = pod.metadata.labels.get(role_label_key, "unknown") + pods_by_role[role].append(uuid) + + # Kill in stages using our sorted lists + for role in shutdown_order: + uuids_in_step = pods_by_role[role] + if uuids_in_step: + self.log.info( + f"--- Termination Step: Shutting down role '{role}' ({len(uuids_in_step)} pod(s)) ---" + ) + kill_and_wait(uuids_in_step) # This call is blocking + self.log.info(f"--- Termination Step: Role '{role}' complete ---") + # Finalize and clean up final_ret = [] - for proc_uuid in uuids_to_kill: + for proc_uuid in targeted_uuids: if proc_uuid in self.boot_request: pi = ProcessInstance( process_description=self.boot_request[ diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index 302d9b982..68b101d03 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -151,6 +151,17 @@ def collect_apps( ) log.debug(f"Collecting app {app.id} with args {args}") + data_path = None + if "DFApplication" in app.oksTypes(): + try: + # DFApplication -> data_writers -> data_store_params -> directory_path + data_path = app.data_writers[0].data_store_params.directory_path + except (AttributeError, IndexError): + log.debug( + f"DFApplication {app.id} is missing its data path configuration." + ) + pass + apps.append( { "name": app.id, @@ -161,6 +172,7 @@ def collect_apps( "env": app_env, "tree_id": app_tree_id_str, "log_path": app.log_path, + "data_path": data_path, } ) app_index += 1 diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 4feadbc30..b295df832 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -237,6 +237,7 @@ def _build_boot_request( args = app["args"] env = app["env"] app_log_path = app["log_path"] + data_path = app.get("data_path") env["DUNE_DAQ_BASE_RELEASE"] = os.getenv("DUNE_DAQ_BASE_RELEASE") env["SPACK_RELEASES_DIR"] = os.getenv("SPACK_RELEASES_DIR") tree_id = app["tree_id"] @@ -261,6 +262,13 @@ def _build_boot_request( if host_is_local(host) and not os.path.exists(os.path.dirname(log_path)): raise DruncShellException(f"Log path {log_path} does not exist.") + process_restriction = ProcessRestriction(allowed_hosts=[host]) + if data_path: + self.log.debug( + f"Attaching data_path '{data_path}' to the boot request for '{name}'" + ) + process_restriction.data_mount = data_path + self.log.debug(f"{name}'s env:\n{env}") breq = BootRequest( token=copy_token(self.token), @@ -277,7 +285,7 @@ def _build_boot_request( process_execution_directory=pwd, process_logs_path=log_path, ), - process_restriction=ProcessRestriction(allowed_hosts=[host]), + process_restriction=process_restriction, ) self.log.debug(f"{breq=}\n\n") return breq diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 738c7c8b6..9e4128acf 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -225,12 +225,31 @@ def unified_shell( unified_shell_log.debug("[green]Process manager[/green] started") # Check if the process manager started correctly - for _ in range(100): + process_started = False + for _ in range(100): # 10s timeout if ready_event.is_set(): + process_started = True break + + if not ctx.obj.pm_process.is_alive(): + exit_code = ctx.obj.pm_process.exitcode + unified_shell_log.error( + f"[red]Process manager process died unexpectedly with exit code {exit_code}." + ) + unified_shell_log.error( + "[red]This is likely a configuration error (e.g., bad kube-config)." + ) + unified_shell_log.error( + "[red]Please check the full traceback in the terminal above this message.[/red]" + ) + sys.exit(exit_code if exit_code else 1) sleep(0.1) - if not ready_event.is_set(): - raise DruncSetupException("[red]Process manager didn't start in time[/red]") + + if not process_started: + # This message will only show if the process is *alive* but never sent the "ready" signal + raise DruncSetupException( + "[red]Process manager timed out starting. Check logs for details.[/red]" + ) # Setup the process manager address process_manager_address = resolve_localhost_and_127_ip_to_network_ip(