diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index fca806fd7..ff77f8103 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -615,7 +615,7 @@ def add_to_table(table, response, prefix=""): else "[red]NA[/]" ), ) - for child_response in response.children: + for child_response in sorted(response.children, key=lambda c: c.name): add_to_table(table, child_response, " " + prefix) add_to_table(t, result) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index e47c2ece3..38fb61946 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -214,9 +214,12 @@ def logs( ) result = obj.get_driver("process_manager").logs(log_req) + if result is None: + return - if result.uuid.uuid is not None: - obj.rule(f"[yellow]{result.uuid.uuid}[/yellow] logs") + display_name = result.name or result.uuid.uuid or "" + if result.name is not None: + obj.rule(f"[yellow]{display_name}[/yellow] logs") for line in result.lines: if not line.strip(): # keep empty lines for visual clarity @@ -234,7 +237,8 @@ def logs( line = line.replace(grep, f"[u]{grep}[/]") obj.print(line) - obj.rule("End") + if result.name is not None: + obj.rule(f"[yellow]{display_name}[/yellow] end") @click.command("restart") diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 47a9029a9..9a81482d6 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -626,21 +626,24 @@ def _get_pod_volumes_and_mounts( # Check if this path is already covered by the JSON volumes above is_covered = False for vm in container_volume_mounts: - if vm.mount_path == target_home_path or target_home_path.startswith(vm.mount_path + "/"): - self.log.debug(f"Home path '{target_home_path}' is already covered by mount '{vm.mount_path}'") + if vm.mount_path == target_home_path or target_home_path.startswith( + vm.mount_path + "/" + ): + self.log.debug( + f"Home path '{target_home_path}' is already covered by mount '{vm.mount_path}'" + ) is_covered = True break if not is_covered: self.log.info(f"Auto-mounting home directory: '{target_home_path}'") vol_name = f"home-{username}" - + pod_volumes.append( client.V1Volume( name=vol_name, host_path=client.V1HostPathVolumeSource( - path=target_home_path, - type="Directory" + path=target_home_path, type="Directory" ), ) ) @@ -1001,6 +1004,7 @@ def _get_host_username(self) -> str: except KeyError: try: import pwd + return pwd.getpwuid(os.getuid()).pw_name except KeyError: return str(os.getuid()) @@ -1343,7 +1347,9 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: logs = self._core_v1_api.read_namespaced_pod_log( podname, session, tail_lines=log_request.how_far or 100 ) - return LogLines(uuid=ProcessUUID(uuid=uuid), lines=logs.split("\n")) + return LogLines( + name=podname, uuid=ProcessUUID(uuid=uuid), lines=logs.split("\n") + ) except self._api_error_v1_api as e: return LogLines( uuid=ProcessUUID(uuid=uuid), diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 1345efddb..aa0b23142 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -479,6 +479,14 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: lines=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) + except BadQuery as e: + return LogLines( + name=self.name, + token=None, + uuid=None, + lines=[str(e)], + flag=ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT, + ) return response diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index e5b5633f2..135e699ae 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -21,7 +21,7 @@ ProcessRestriction, ) from druncschema.process_manager_pb2_grpc import ProcessManagerStub -from druncschema.request_response_pb2 import Request +from druncschema.request_response_pb2 import Request, ResponseFlag from druncschema.token_pb2 import Token from drunc.connectivity_service.client import ConnectivityServiceClient @@ -723,11 +723,27 @@ def kill( return response - def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines: + def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines | None: request.token.CopyFrom(self.token) try: response = self.stub.logs(request, timeout=timeout) + + # Check if the response indicates a BadQuery error + if response.flag == ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT: + lines = response.lines + if len(lines) == 1: + lines = lines[0] + self.log.warning(f"Bad query for logs: {lines}") + return None + + # Check for other error flags + if response.flag == ResponseFlag.DRUNC_EXCEPTION_THROWN: + self.log.error(f"Exception occurred on server: {response.lines}") + return None + + return response + except grpc.RpcError as e: try: error_details = extract_grpc_rich_error(e) @@ -737,10 +753,8 @@ def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines: f"Could not extract rich error details from gRPC error: {extraction_error}", exc_info=True, ) - handle_grpc_error(e) - - return response + return None def ps( self, request: ProcessQuery, timeout: int | float = 60 diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 3571f0636..247e89b20 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -230,6 +230,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: logfile = self.boot_request[uid].process_description.process_logs_path user = self.boot_request[uid].process_description.metadata.user host = self.boot_request[uid].process_description.metadata.hostname + process_name = self.boot_request[uid].process_description.metadata.name # Determine number of lines to retrieve (default: 100) nlines = log_request.how_far if log_request.how_far else 100 @@ -241,7 +242,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: ) return LogLines( - name=self.name, + name=process_name, token=None, uuid=ProcessUUID(uuid=uid), lines=lines, diff --git a/src/drunc/process_manager/utils.py b/src/drunc/process_manager/utils.py index 7c7799719..82c4720fa 100644 --- a/src/drunc/process_manager/utils.py +++ b/src/drunc/process_manager/utils.py @@ -2,10 +2,14 @@ import os import re from functools import update_wrapper -from operator import attrgetter import click -from druncschema.process_manager_pb2 import ProcessInstance, ProcessQuery, ProcessUUID +from druncschema.process_manager_pb2 import ( + ProcessInstance, + ProcessInstanceList, + ProcessQuery, + ProcessUUID, +) from rich.table import Table from drunc.exceptions import DruncCommandException, DruncException, DruncSetupException @@ -63,7 +67,58 @@ def make_tree(values): return lines -def tabulate_process_instance_list(pil, title, long=False): +def order_process_by_name(processes: list[ProcessInstance]): + """Given a list of processes, perform a tiered order by the name""" + by_session = {} + for process in processes: + m = process.process_description.metadata + by_session.setdefault(m.session, []).append(process) + + ordered = [] + for session in sorted(by_session.keys()): + session_processes = by_session[session] + node_by_id = {} + children = {} + roots = [] + + for process in session_processes: + tree_id = process.process_description.metadata.tree_id or "" + node_by_id.setdefault(tree_id, []).append(process) + + for tree_id, processes in node_by_id.items(): + node_by_id[tree_id] = sorted( + processes, + key=lambda p: ( + p.process_description.metadata.name, + p.uuid.uuid, + ), + ) + + for tree_id in node_by_id.keys(): + parent_id = tree_id.rsplit(".", 1)[0] if "." in tree_id else None + if not parent_id or parent_id not in node_by_id: + roots.append(tree_id) + else: + children.setdefault(parent_id, []).append(tree_id) + + def sort_key(tree_id): + m = node_by_id[tree_id][0].process_description.metadata + return (m.name, tree_id) + + def walk(tree_id): + ordered.extend(node_by_id[tree_id]) + for child_id in sorted(children.get(tree_id, []), key=sort_key): + walk(child_id) + + for root_id in sorted(roots, key=sort_key): + walk(root_id) + + return ordered + + +def tabulate_process_instance_list( + pil: ProcessInstanceList, title: str, long: bool = False +): t = Table(title=title) t.add_column("session") t.add_column("friendly name") @@ -75,13 +130,7 @@ def tabulate_process_instance_list(pil, title, long=False): if long: t.add_column("executable") - sorted_pil = sorted( - pil.values, - key=attrgetter( - "process_description.metadata.session", - "process_description.metadata.tree_id", - ), - ) + sorted_pil = order_process_by_name(pil.values) tree_str = make_tree(sorted_pil) try: for process, line in zip(sorted_pil, tree_str):