From 6a1c6fe7c82ee5fa67bf530b9536453403715987 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 24 Jul 2025 11:18:38 +0100 Subject: [PATCH 01/30] Update remaining sessman types. --- src/drunc/session_manager/session_manager.py | 1 - src/drunc/session_manager/session_manager_driver.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/drunc/session_manager/session_manager.py b/src/drunc/session_manager/session_manager.py index a1ce5b59c..e7f4c78f3 100644 --- a/src/drunc/session_manager/session_manager.py +++ b/src/drunc/session_manager/session_manager.py @@ -82,7 +82,6 @@ def describe(self, request: Request, context: ServicerContext) -> Description: return Description( type="session_manager", name=self.name, - session=self.name, commands=commands, children=[], flag=ResponseFlag.EXECUTED_SUCCESSFULLY, diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index c4c6b79d0..cd214f172 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -6,7 +6,7 @@ from druncschema.token_pb2 import Token from grpc import Channel -from drunc.utils.shell_utils import DecodedResponse, GRPCDriver +from drunc.utils.shell_utils import GRPCDriver class SessionManagerDriver(GRPCDriver): @@ -39,7 +39,7 @@ def create_stub(self, channel: Channel) -> SessionManagerStub: """ return SessionManagerStub(channel) - def describe(self) -> DecodedResponse | None: + def describe(self) -> Description: """Describe the session manager service. Returns: @@ -47,7 +47,7 @@ def describe(self) -> DecodedResponse | None: """ return self.send_command("describe", outformat=Description) - def list_all_sessions(self) -> DecodedResponse | None: + def list_all_sessions(self) -> AllActiveSessions: """List all active sessions managed by the session manager. Returns: @@ -55,7 +55,7 @@ def list_all_sessions(self) -> DecodedResponse | None: """ return self.send_command("list_all_sessions", outformat=AllActiveSessions) - def list_all_configs(self) -> DecodedResponse | None: + def list_all_configs(self) -> AllConfigKeys: """List all available configurations in the session manager. Returns: From f44616a0eadc1b16eaffc99f132e92b2a4737bf0 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 24 Jul 2025 11:23:59 +0100 Subject: [PATCH 02/30] Procman describe uses new Description msg. --- src/drunc/process_manager/process_manager.py | 26 +++++++++---------- .../process_manager/process_manager_driver.py | 6 ++--- src/drunc/utils/shell_utils.py | 8 +++++- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index a60a7fef2..a0324a3c8 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -5,7 +5,7 @@ from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType -from druncschema.description_pb2 import CommandDescription, OldDescription +from druncschema.description_pb2 import CommandDescription, Description from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( BootRequest, @@ -105,7 +105,7 @@ def __init__( name="describe", data_type=["None"], help="Describe self (return a list of commands, the type of endpoint, the name and session).", - return_type="description_pb2.OldDescription", + return_type="description_pb2.Description", ), CommandDescription( name="kill", @@ -475,27 +475,25 @@ def flush(self, request: Request, context: ServicerContext) -> Response: @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step - def describe(self, request: Request, context: ServicerContext) -> Response: + def describe(self, request: Request, context: ServicerContext) -> Description: self.log.debug(f"{self.name} running describe") - bd = self.describe_broadcast() - d = OldDescription( + + description = Description( type="process_manager", name=self.name, info=self.configuration.log_path, session="no_session" if not self.session else self.session, commands=self.commands, - ) - if bd: - d.broadcast.CopyFrom(pack_to_any(bd)) - - return Response( - name=self.name, - token=None, - data=pack_to_any(d), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, children=[], + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + token=None, ) + if broadcast_description := self.describe_broadcast(): + description.broadcast.CopyFrom(pack_to_any(broadcast_description)) + + return description + @abc.abstractmethod def _logs_impl(self, request_data: LogRequest) -> LogLines: raise NotImplementedError diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 89ca38d05..556c15cf2 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -6,7 +6,7 @@ import time from time import sleep -from druncschema.description_pb2 import OldDescription +from druncschema.description_pb2 import Description from druncschema.process_manager_pb2 import ( BootRequest, LogLines, @@ -437,9 +437,9 @@ def restart( timeout=timeout, ) - def describe(self, timeout: int | float = 60) -> OldDescription: + def describe(self, timeout: int | float = 60) -> Description: return self.send_command( "describe", - outformat=OldDescription, + outformat=Description, timeout=timeout, ) diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 505f66d75..3c77ddeba 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -223,7 +223,13 @@ def send_command( from druncschema.description_pb2 import Description from druncschema.session_manager_pb2 import AllActiveSessions, AllConfigKeys - if isinstance(response, (Description, AllActiveSessions, AllConfigKeys)): + new_message_types = ( + Description, + AllActiveSessions, + AllConfigKeys, + ) + + if isinstance(response, new_message_types): return response return self.handle_response(response, command, outformat) From 24e9009725db59fa322b2a07f4504f00a9d91ca3 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 24 Jul 2025 17:18:14 +0100 Subject: [PATCH 03/30] New terminate and kill messages. --- src/drunc/process_manager/process_manager.py | 78 ++++++++++--------- .../process_manager/process_manager_driver.py | 8 +- .../process_manager/ssh_process_manager.py | 20 ++--- 3 files changed, 56 insertions(+), 50 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index a0324a3c8..8a1e87f29 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -117,13 +117,13 @@ def __init__( name="restart", data_type=["process_manager_pb2.ProcessQuery"], help="Restart the process from the process query (which must correspond to one process).", - return_type="process_manager_pb2.ProcessInstance", + return_type="process_manager_pb2.ProcessInstanceList", ), CommandDescription( name="boot", data_type=["generic_pb2.BootRequest", "None"], help="Start a process.", - return_type="process_manager_pb2.ProcessInstance", + return_type="process_manager_pb2.ProcessInstanceList", ), CommandDescription( name="terminate", @@ -147,7 +147,7 @@ def __init__( name="ps", data_type=["process_manager_pb2.ProcessQuery"], help="Get the status of the listed process from the process query input (can be multiple).", - return_type="process_manager_pb2.ProcessInstance", + return_type="process_manager_pb2.ProcessInstanceList", ), ] @@ -251,12 +251,11 @@ def boot(self, request: Request, context: ServicerContext) -> Response: ) try: - resp = self._boot_impl(data) - + response = self._boot_impl(data) return Response( name=self.name, token=None, - data=pack_to_any(resp), + data=pack_to_any(response), flag=ResponseFlag.EXECUTED_SUCCESSFULLY, children=[], ) @@ -264,13 +263,13 @@ def boot(self, request: Request, context: ServicerContext) -> Response: return Response( name=self.name, token=None, - data=pack_to_any(resp), + data=pack_to_any(response), flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, children=[], ) @abc.abstractmethod - def _terminate_impl(self) -> ProcessInstanceList: + def _terminate_impl(self) -> list[ProcessInstance]: raise NotImplementedError # ORDER MATTERS! @@ -278,26 +277,29 @@ def _terminate_impl(self) -> ProcessInstanceList: @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def terminate(self, request: Request, context: ServicerContext) -> Response: + def terminate( + self, request: Request, context: ServicerContext + ) -> ProcessInstanceList: self.log.debug(f"{self.name} terminating") + + # tototo try: - resp = self._terminate_impl() - return Response( - name=self.name, - token=None, - data=pack_to_any(resp), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) + response = self._terminate_impl() except NotImplementedError: - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(resp), + values=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) + return ProcessInstanceList( + name=self.name, + token=None, + values=response, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + @abc.abstractmethod def _restart_impl(self, q: ProcessQuery) -> ProcessInstanceList: raise NotImplementedError @@ -316,11 +318,11 @@ def restart(self, request: Request, context: ServicerContext) -> Response: self.log.debug(f"{self.name} running restart") try: - resp = self._restart_impl(data) + response = self._restart_impl(data) return Response( name=self.name, token=None, - data=pack_to_any(resp), + data=pack_to_any(response), flag=ResponseFlag.EXECUTED_SUCCESSFULLY, children=[], ) @@ -328,13 +330,13 @@ def restart(self, request: Request, context: ServicerContext) -> Response: return Response( name=self.name, token=None, - data=pack_to_any(resp), + data=pack_to_any(response), flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, children=[], ) @abc.abstractmethod - def _kill_impl(self, q: ProcessQuery) -> ProcessInstanceList: + def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: raise NotImplementedError # ORDER MATTERS! @@ -342,32 +344,32 @@ def _kill_impl(self, q: ProcessQuery) -> ProcessInstanceList: @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def kill(self, request: Request, context: ServicerContext) -> Response: + def kill(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + self.log.debug(f"{self.name} running kill") + try: data = unpack_any(request.data, ProcessQuery) except UnpackingError as e: return unpack_error_response(self.__class__.__name__, str(e), request.token) - self.log.debug(f"{self.name} running kill") - + # tototo try: - resp = self._kill_impl(data) - return Response( - name=self.name, - token=None, - data=pack_to_any(resp), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) + response = self._kill_impl(data) except NotImplementedError: - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(resp), + values=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) + return ProcessInstanceList( + name=self.name, + token=None, + values=response, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + @abc.abstractmethod def _ps_impl(self, q: ProcessQuery) -> ProcessInstanceList: raise NotImplementedError diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 556c15cf2..98d6d6b61 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -393,7 +393,9 @@ def terminate( "terminate", outformat=ProcessInstanceList, timeout=timeout ) - def kill(self, query: ProcessQuery, timeout: int | float = 60) -> ProcessInstance: + def kill( + self, query: ProcessQuery, timeout: int | float = 60 + ) -> ProcessInstanceList: return self.send_command( "kill", data=query, @@ -429,11 +431,11 @@ def flush( def restart( self, query: ProcessQuery, timeout: int | float = 60 - ) -> ProcessInstance: + ) -> ProcessInstanceList: return self.send_command( "restart", data=query, - outformat=ProcessInstance, + outformat=ProcessInstanceList, timeout=timeout, ) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 14652bec3..8e027a124 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -71,7 +71,7 @@ def __init__(self, configuration, **kwargs): self.ssh = sh.Command("/usr/bin/ssh") - def kill_processes(self, uuids: list) -> ProcessInstanceList: + def kill_processes(self, uuids: list) -> list[ProcessInstance]: ret = [] for proc_uuid in uuids: process = self.process_store[proc_uuid] @@ -117,11 +117,11 @@ def kill_processes(self, uuids: list) -> ProcessInstanceList: ] del self.process_store[proc_uuid] - pil = ProcessInstanceList(values=ret) - return pil + return ret - def _terminate_impl(self) -> ProcessInstanceList: + def _terminate_impl(self) -> list[ProcessInstance]: self.log.info("Terminating") + if self.process_store: self.log.info("Killing all the known processes before exiting") uuids = self._get_process_uid( @@ -130,7 +130,7 @@ def _terminate_impl(self) -> ProcessInstanceList: return self.kill_processes(uuids) else: self.log.info("No known process to kill before exiting") - return ProcessInstanceList() + return [] def _logs_impl(self, log_request: LogRequest) -> LogLines: self.log.debug(f"Retrieving logs for {log_request.query}") @@ -178,7 +178,8 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: except Exception as e: if uid not in self.process_store: return LogLines( - uuid=ProcessUUID(uuid=uid), lines=[f"Could not retrieve logs: {e!s}"] + uuid=ProcessUUID(uuid=uid), + lines=[f"Could not retrieve logs: {e!s}"], ) else: return LogLines( @@ -186,7 +187,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: lines=[ f"Could not retrieve logs: {e!s}", f"stdout: {self.process_store[uid].stdout}", - f"stderr: {self.process_store[uid].stderr}" + f"stderr: {self.process_store[uid].stderr}", ], ) @@ -435,11 +436,12 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: return ret - def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: + def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: self.log.info(f"{self.name} killing {query.names} in session {self.session}") + if self.process_store: uuids = self._get_process_uid(query, order_by="leaf_first") return self.kill_processes(uuids) else: self.log.info("No known process to kill before exiting") - return ProcessInstanceList() + return [] From b01d1d7ae1f820022750d200db3413c820148085 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 24 Jul 2025 17:49:23 +0100 Subject: [PATCH 04/30] New restart messages. --- .../process_manager/k8s_process_manager.py | 22 ++++++------- src/drunc/process_manager/process_manager.py | 33 +++++++++---------- .../process_manager/ssh_process_manager.py | 4 +-- 3 files changed, 28 insertions(+), 31 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 2eaa4b8bc..7547b2811 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -368,16 +368,18 @@ def _return_code(self, podname, session): def _terminate(self): self.log.info("Terminating") - def _logs_impl(self, log_request: LogRequest) -> LogLines: uuids = self._get_process_uid(log_request.query, in_boot_request=True) uuid = self._ensure_one_process(uuids, in_boot_request=True) for uuid in self._get_process_uid(log_request.query): podname = self.boot_request[uuid].process_description.metadata.name session = self.boot_request[uuid].process_description.metadata.session - return [LogLines(line=log) for log in self._core_v1_api.read_namespaced_pod_log( - podname, session, tail_lines=log_request.how_far - ).split("\n")] + return [ + LogLines(line=log) + for log in self._core_v1_api.read_namespaced_pod_log( + podname, session, tail_lines=log_request.how_far + ).split("\n") + ] def _boot_impl(self, boot_request: BootRequest) -> ProcessUUID: this_uuid = str(uuid.uuid4()) @@ -426,8 +428,8 @@ def _ps_impl( return pil - def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: - # ret = [] + def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + ret = [] uuids = self._get_process_uid(query, in_boot_request=True) uuid = self._ensure_one_process(uuids, in_boot_request=True) for uuid in self._get_process_uid(query): @@ -445,16 +447,12 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: del self.boot_request[uuid] del uuid - ret = self.__boot(same_uuid_br, same_uuid) + ret = [self.__boot(same_uuid_br, same_uuid)] + # ret.append(self._get_pi(uuid, podname, session)) del same_uuid_br del same_uuid - # ret.append(self._get_pi(uuid, podname, session)) - - # pil = ProcessInstanceList( - # values=ret - # ) return ret # # ORDER MATTERS! diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 8a1e87f29..2e06dc275 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -280,9 +280,8 @@ def _terminate_impl(self) -> list[ProcessInstance]: def terminate( self, request: Request, context: ServicerContext ) -> ProcessInstanceList: - self.log.debug(f"{self.name} terminating") + self.log.debug(f"{self.name} running terminate") - # tototo try: response = self._terminate_impl() except NotImplementedError: @@ -301,7 +300,7 @@ def terminate( ) @abc.abstractmethod - def _restart_impl(self, q: ProcessQuery) -> ProcessInstanceList: + def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: raise NotImplementedError # ORDER MATTERS! @@ -309,32 +308,33 @@ def _restart_impl(self, q: ProcessQuery) -> ProcessInstanceList: @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def restart(self, request: Request, context: ServicerContext) -> Response: + def restart( + self, request: Request, context: ServicerContext + ) -> ProcessInstanceList: + self.log.debug(f"{self.name} running restart") + try: data = unpack_any(request.data, ProcessQuery) except UnpackingError as e: return unpack_error_response(self.__class__.__name__, str(e), request.token) - self.log.debug(f"{self.name} running restart") - try: response = self._restart_impl(data) - return Response( - name=self.name, - token=None, - data=pack_to_any(response), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) except NotImplementedError: - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(response), + values=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) + return ProcessInstanceList( + name=self.name, + token=None, + values=response, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + @abc.abstractmethod def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: raise NotImplementedError @@ -352,7 +352,6 @@ def kill(self, request: Request, context: ServicerContext) -> ProcessInstanceLis except UnpackingError as e: return unpack_error_response(self.__class__.__name__, str(e), request.token) - # tototo try: response = self._kill_impl(data) except NotImplementedError: diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 8e027a124..cdf905af1 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -410,7 +410,7 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: this_uuid = str(uuid.uuid4()) return self.__boot(boot_request, this_uuid) - def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: + def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: self.log.info(f"{self.name} restarting {query.names} in session {self.session}") uuids = self._get_process_uid(query, in_boot_request=True) uuid = self._ensure_one_process(uuids, in_boot_request=True) @@ -429,7 +429,7 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: del self.boot_request[uuid] del uuid - ret = self.__boot(same_uuid_br, same_uuid) + ret = [self.__boot(same_uuid_br, same_uuid)] del same_uuid_br del same_uuid From 8fe3d7b9a76ce082edda8066d4d23067eb9ec5e7 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 14:32:03 +0100 Subject: [PATCH 05/30] Most PM commands' messages except boot and logs. --- .../process_manager/k8s_process_manager.py | 6 +-- src/drunc/process_manager/process_manager.py | 53 ++++++++----------- .../process_manager/process_manager_driver.py | 4 +- .../process_manager/ssh_process_manager.py | 7 +-- src/drunc/utils/shell_utils.py | 2 + 5 files changed, 32 insertions(+), 40 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 7547b2811..48eb04b87 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -414,7 +414,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: def _ps_impl( self, query: ProcessQuery, in_boot_request: bool = False - ) -> ProcessInstanceList: + ) -> list[ProcessInstance]: ret = [] for proc_uuid in self._get_process_uid(query): podname = self.boot_request[proc_uuid].process_description.metadata.name @@ -424,9 +424,7 @@ def _ps_impl( ret.append(self._get_pi(proc_uuid, podname, session, return_code)) - pil = ProcessInstanceList(values=ret) - - return pil + return ret def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: ret = [] diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 2e06dc275..a8a386da7 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -173,19 +173,16 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0): n_running = sum( 1 - for process in results.values + for process in results if process.status_code == ProcessInstance.StatusCode.RUNNING ) n_dead = sum( 1 - for process in results.values + for process in results if process.status_code == ProcessInstance.StatusCode.DEAD ) n_session = len( - { - process.process_description.metadata.session - for process in results.values - } + {process.process_description.metadata.session for process in results} ) self.opmon_publisher.publish( message=ProcessStatus( @@ -231,7 +228,7 @@ def interrupt_with_exception(self, *args, **kwargs): ) @abc.abstractmethod - def _boot_impl(self, br: BootRequest) -> ProcessInstance: + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: raise NotImplementedError # ORDER MATTERS! @@ -370,7 +367,7 @@ def kill(self, request: Request, context: ServicerContext) -> ProcessInstanceLis ) @abc.abstractmethod - def _ps_impl(self, q: ProcessQuery) -> ProcessInstanceList: + def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: raise NotImplementedError # ORDER MATTERS! @@ -378,45 +375,44 @@ def _ps_impl(self, q: ProcessQuery) -> ProcessInstanceList: @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step - def ps(self, request: Request, context: ServicerContext) -> Response: + def ps(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + self.log.debug(f"{self.name} running ps") + try: data = unpack_any(request.data, ProcessQuery) except UnpackingError as e: return unpack_error_response(self.__class__.__name__, str(e), request.token) - self.log.debug(f"{self.name} running ps") - try: - resp = self._ps_impl(data) - return Response( - name=self.name, - token=None, - data=pack_to_any(resp), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) + response = self._ps_impl(data) except NotImplementedError: - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(resp), + values=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) + return ProcessInstanceList( + name=self.name, + token=None, + values=response, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + # ORDER MATTERS! @broadcasted # outer most wrapper 1st step @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def flush(self, request: Request, context: ServicerContext) -> Response: + def flush(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + self.log.debug(f"{self.name} running flush") + try: data = unpack_any(request.data, ProcessQuery) except UnpackingError as e: return unpack_error_response(self.__class__.__name__, str(e), request.token) - self.log.debug(f"{self.name} running flush") - ret = [] for uuid in self._get_process_uid(data): if uuid not in self.boot_request: @@ -461,14 +457,11 @@ def flush(self, request: Request, context: ServicerContext) -> Response: del self.process_store[uuid] ret += [pi] - pil = ProcessInstanceList(values=ret) - - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(pil), + values=ret, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], ) # ORDER MATTERS! diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 98d6d6b61..b8272282a 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -390,7 +390,9 @@ def terminate( timeout: int | float = 60, ) -> ProcessInstanceList: return self.send_command( - "terminate", outformat=ProcessInstanceList, timeout=timeout + "terminate", + outformat=ProcessInstanceList, + timeout=timeout, ) def kill( diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index cdf905af1..f3954d05e 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -14,7 +14,6 @@ LogRequest, ProcessDescription, ProcessInstance, - ProcessInstanceList, ProcessQuery, ProcessRestriction, ProcessUUID, @@ -361,7 +360,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: return pi - def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: + def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: ret = [] for proc_uuid in self._get_process_uid(query): @@ -401,9 +400,7 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: ) ret += [pi] - pil = ProcessInstanceList(values=ret) - - return pil + return ret def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: self.log.debug(f"{self.name} running _boot_impl") diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 3c77ddeba..df78e20f2 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -221,12 +221,14 @@ def send_command( # TODO: TEMP HACK UNTIL UNPACKING IS REMOVED from druncschema.description_pb2 import Description + from druncschema.process_manager_pb2 import ProcessInstanceList from druncschema.session_manager_pb2 import AllActiveSessions, AllConfigKeys new_message_types = ( Description, AllActiveSessions, AllConfigKeys, + ProcessInstanceList, ) if isinstance(response, new_message_types): From 5c37ab44329789036e81e7e0d4755f14c64cef71 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 14:33:52 +0100 Subject: [PATCH 06/30] Whack-a-mole with regressions. --- .../process_manager/interface/commands.py | 17 +++++++-------- .../process_manager/interface/context.py | 2 +- src/drunc/process_manager/interface/shell.py | 9 ++++---- src/drunc/unified_shell/shell.py | 21 ++++++------------- 4 files changed, 18 insertions(+), 31 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 04a594eab..03c9709a5 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -45,9 +45,9 @@ def boot( log = get_logger("process_manager.shell") processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user)) - if len(processes.data.values) > 0: + if len(processes.values) > 0: click.confirm( - f"You already have {len(processes.data.values)} processes running, are you sure you want to boot a session?", + f"You already have {len(processes.values)} processes running, are you sure you want to boot a session?", abort=True, ) @@ -159,7 +159,7 @@ def terminate(obj: ProcessManagerContext) -> None: if not result: return obj.print( - tabulate_process_instance_list(result.data, "Terminated process", False) + tabulate_process_instance_list(result, "Terminated process", False) ) # rich tables require console printing obj.delete_driver("controller") @@ -175,7 +175,7 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None: if not result: return obj.print( - tabulate_process_instance_list(result.data, "Killed process", False) + tabulate_process_instance_list(result, "Killed process", False) ) # rich tables require console printing obj.delete_driver("controller") @@ -191,7 +191,7 @@ def flush(obj: ProcessManagerContext, query: ProcessQuery) -> None: if not result: return obj.print( - tabulate_process_instance_list(result.data, "Flushed process", False) + tabulate_process_instance_list(result, "Flushed process", False) ) # rich tables require console printing @@ -222,7 +222,6 @@ def logs( obj.rule(f"[yellow]{result.uuid.uuid}[/yellow] logs") for line in result.lines: - if line == "": obj.print("") continue @@ -262,9 +261,7 @@ def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: help="Whether to have a long output", ) @click.pass_obj -def ps( - obj: ProcessManagerContext, query: ProcessQuery, long_format: bool -) -> None: +def ps(obj: ProcessManagerContext, query: ProcessQuery, long_format: bool) -> None: log = get_logger("process_manager.shell") log.debug(f"Running ps with query {query}") results = obj.get_driver("process_manager").ps(query=query) @@ -272,6 +269,6 @@ def ps( return obj.print( tabulate_process_instance_list( - results.data, title="Processes running", long=long_format + results, title="Processes running", long=long_format ) ) diff --git a/src/drunc/process_manager/interface/context.py b/src/drunc/process_manager/interface/context.py index d2eeeb846..9efa95859 100644 --- a/src/drunc/process_manager/interface/context.py +++ b/src/drunc/process_manager/interface/context.py @@ -34,7 +34,7 @@ def create_drivers(self, **kwargs) -> Mapping[str, GRPCDriver]: "process_manager": ProcessManagerDriver( self.address, self._token, - aio_channel=True, + aio_channel=False, ) } diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index a192ede09..b7c6370da 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -48,7 +48,6 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> ctx.obj.reset(address=process_manager_address) try: - desc = ctx.obj.get_driver("process_manager").describe() except ServerUnreachable as e: process_manager_shell_log = get_logger( @@ -63,7 +62,7 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> process_manager_log = get_logger( logger_name="process_manager", - log_file_path=desc.data.info, + log_file_path=desc.info, override_log_file=False, rich_handler=True, ) @@ -72,10 +71,10 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> f"[green]{getpass.getuser()}[/green] connected to the process manager through a [green]drunc-process-manager-shell[/green] via address [green]{process_manager_address}[/green]" ) process_manager_shell_log.info( - f"Connected to {process_manager_address}, running '{desc.data.name}.{desc.data.session}' (name.session), starting listening..." + f"Connected to {process_manager_address}, running '{desc.name}.{desc.session}' (name.session), starting listening..." ) - if desc.data.HasField("broadcast"): - ctx.obj.start_listening(desc.data.broadcast) + if desc.HasField("broadcast"): + ctx.obj.start_listening(desc.broadcast) def cleanup(): ctx.obj.terminate() diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index ae9ec9913..4c25d3eae 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -118,8 +118,7 @@ def unified_shell( ctx.obj.session_name = session_name db = conffwk.Configuration(ctx.obj.configuration_file) - session_dal = db.get_dal(class_name="Session", - uid=ctx.obj.configuration_id) + session_dal = db.get_dal(class_name="Session", uid=ctx.obj.configuration_id) app_log_path = session_dal.log_path connectivity_service_address = f"{session_dal.connectivity_service.host}:{session_dal.connectivity_service.service.port}" @@ -133,8 +132,7 @@ def unified_shell( f"Spawning [green]process_manager[/green] with configuration {process_manager}" ) # Check if process_manager is a packaged config - process_manager_conf_file = get_process_manager_configuration( - process_manager) + process_manager_conf_file = get_process_manager_configuration(process_manager) ready_event = mp.Event() port = mp.Value("i", 0) @@ -189,7 +187,6 @@ def unified_shell( unified_shell_log.debug("Runnning [green]describe[/green]") try: desc = ctx.obj.get_driver().describe() - desc = desc.data except Exception as e: unified_shell_log.error( f"[red]Could not connect to the process manager at the address[/red] [green]{process_manager_address}[/]" @@ -279,12 +276,10 @@ def cleanup(): # Let's do this unified_shell_log.debug("Retrieving the session database") db = conffwk.Configuration(ctx.obj.configuration_file) - session_dal = db.get_dal(class_name="Session", - uid=ctx.obj.configuration_id) + session_dal = db.get_dal(class_name="Session", uid=ctx.obj.configuration_id) controller_name = session_dal.segment.controller.id - unified_shell_log.debug( - "Initializing the [green]ControllerConfHandler[/green]") + unified_shell_log.debug("Initializing the [green]ControllerConfHandler[/green]") controller_configuration = ControllerConfHandler( type=ConfTypes.OKSFileName, data=ctx.obj.configuration_file, @@ -309,16 +304,12 @@ def cleanup(): ) unified_shell_log.debug("Initializing the [green]StatefulNode[/green]") - stateful_node = StatefulNode( - fsm_configuration=fsmch, - top_segment_controller=False - ) + stateful_node = StatefulNode(fsm_configuration=fsmch, top_segment_controller=False) unified_shell_log.debug( "Retrieving the transitions from the [green]StatefulNode[/green]" ) - transitions = convert_fsm_transition( - stateful_node.get_all_fsm_transitions()) + transitions = convert_fsm_transition(stateful_node.get_all_fsm_transitions()) fsm_logger.setLevel(log_level) fsm_conf_logger.setLevel(log_level) # End of shameful code From 119f918a96df6b7d3a9a8ee06ed0e22a64857542 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 14:40:54 +0100 Subject: [PATCH 07/30] A couple more... --- src/drunc/unified_shell/commands.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 9f2ceef89..7d6dc2e47 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -30,9 +30,9 @@ def boot( ProcessQuery(user=user, session=session_name) ) - if len(processes.data.values) > 0: + if len(processes.values) > 0: click.confirm( - f"You already have {len(processes.data.values)} processes running in session {session_name}, are you sure you want to boot a session?", + f"You already have {len(processes.values)} processes running in session {session_name}, are you sure you want to boot a session?", abort=True, ) From 11f6bd733a8cd23c73eebabd83ba54716355efd6 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 16:26:06 +0100 Subject: [PATCH 08/30] Same treatment for logs message. --- src/drunc/process_manager/process_manager.py | 28 ++++++++-------- .../process_manager/ssh_process_manager.py | 33 ++++++++++++------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index a8a386da7..09d0dfa90 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -489,7 +489,7 @@ def describe(self, request: Request, context: ServicerContext) -> Description: return description @abc.abstractmethod - def _logs_impl(self, request_data: LogRequest) -> LogLines: + def _logs_impl(self, log_request: LogRequest) -> LogLines: raise NotImplementedError # ORDER MATTERS! @@ -497,15 +497,15 @@ def _logs_impl(self, request_data: LogRequest) -> LogLines: @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step - def logs(self, request: Request, context: ServicerContext) -> Response: + def logs(self, request: Request, context: ServicerContext) -> LogLines: """Fetch logs for a process. Args: request: The incoming request. context: The gRPC context (not used). - Yields: - Response objects containing log lines. + Returns: + A response containing log lines. """ self.log.debug("Getting logs") @@ -515,23 +515,21 @@ def logs(self, request: Request, context: ServicerContext) -> Response: return unpack_error_response(self.__class__.__name__, str(e), request.token) try: - return Response( - name=self.name, - token=None, - data=pack_to_any(self._logs_impl(data)), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) + response = self._logs_impl(data) except NotImplementedError: - return Response( + return LogLines( name=self.name, token=None, - data=None, + uuid=None, + lines=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) - def _ensure_one_process(self, uuids: [str], in_boot_request: bool = False) -> str: + return response + + def _ensure_one_process( + self, uuids: list[str], in_boot_request: bool = False + ) -> str: if uuids == []: raise BadQuery("The process corresponding to the query doesn't exist") elif len(uuids) > 1: diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index f3954d05e..8e460e71e 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -18,6 +18,7 @@ ProcessRestriction, ProcessUUID, ) +from druncschema.request_response_pb2 import ResponseFlag from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager @@ -175,27 +176,35 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: _err_to_out=True, ) except Exception as e: - if uid not in self.process_store: - return LogLines( - uuid=ProcessUUID(uuid=uid), - lines=[f"Could not retrieve logs: {e!s}"], - ) - else: - return LogLines( - uuid=ProcessUUID(uuid=uid), - lines=[ - f"Could not retrieve logs: {e!s}", + lines = [f"Could not retrieve logs: {e!s}"] + if uid in self.process_store: + lines.extend( + [ f"stdout: {self.process_store[uid].stdout}", f"stderr: {self.process_store[uid].stderr}", - ], + ] ) + return LogLines( + name=self.name, + token=None, + uuid=ProcessUUID(uuid=uid), + lines=lines, + flag=ResponseFlag.UNHANDLED_EXCEPTION_THROWN, + ) + f.close() with open(f.name) as fi: lines = fi.readlines() if "Connection to " in lines[-1] and " closed." in lines[-1]: lines = lines[:-1] - return LogLines(uuid=ProcessUUID(uuid=uid), lines=lines) + return LogLines( + name=self.name, + token=None, + uuid=ProcessUUID(uuid=uid), + lines=lines, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) os.remove(f.name) From 4cfff7d836de16527cce4e561744b1798478ffb6 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 16:31:12 +0100 Subject: [PATCH 09/30] logs command fix. --- src/drunc/process_manager/interface/commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 03c9709a5..8e77a2162 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -216,7 +216,7 @@ def logs( query=query, ) - result = obj.get_driver("process_manager").logs(log_req).data + result = obj.get_driver("process_manager").logs(log_req) if result.uuid.uuid is not None: obj.rule(f"[yellow]{result.uuid.uuid}[/yellow] logs") From fbf85ea30186d501967674c886c3d7ceb481d000 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 17:00:54 +0100 Subject: [PATCH 10/30] Typing. --- src/drunc/process_manager/process_manager_driver.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index b8272282a..5ab157b18 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -4,6 +4,7 @@ import signal import tempfile import time +from collections.abc import Iterator from time import sleep from druncschema.description_pb2 import Description @@ -12,7 +13,6 @@ LogLines, LogRequest, ProcessDescription, - ProcessInstance, ProcessInstanceList, ProcessMetadata, ProcessQuery, @@ -54,7 +54,7 @@ def _convert_oks_to_boot_request( db, session_name: str, override_logs: bool, - ) -> BootRequest: + ) -> Iterator[BootRequest]: from drunc.process_manager.oks_parser import collect_apps, collect_infra_apps env = { @@ -164,7 +164,7 @@ def boot( int | float ) = 0, # This may be useful if you have are using SSHPM, and have SSHD's maxstartups setting set to a low value. **kwargs, - ) -> ProcessInstance: + ) -> Iterator[ProcessInstanceList]: from daqconf.consolidate import consolidate_db self.log.info(f"Booting session [green]{session_name}[/green]") @@ -236,7 +236,7 @@ def boot( yield self.send_command( "boot", data=br, - outformat=ProcessInstance, + outformat=ProcessInstanceList, timeout=timeout, ) @@ -340,7 +340,7 @@ def dummy_boot( sleep: int, n_sleeps: int, timeout: int | float = 60, - ): # -> ProcessInstance: + ) -> Iterator[ProcessInstanceList]: pwd = os.getcwd() # Construct the list of commands to send to the dummy_boot process @@ -381,7 +381,7 @@ def dummy_boot( yield self.send_command( "boot", data=breq, - outformat=ProcessInstance, + outformat=ProcessInstanceList, timeout=timeout, ) From 4e66288b6036332f04df3eb1029e679546a332c7 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 17:23:44 +0100 Subject: [PATCH 11/30] Use new boot message. --- .../process_manager/k8s_process_manager.py | 3 +- src/drunc/process_manager/process_manager.py | 32 +++++++++---------- .../process_manager/ssh_process_manager.py | 2 +- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 48eb04b87..ef892c4b2 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -381,7 +381,8 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: ).split("\n") ] - def _boot_impl(self, boot_request: BootRequest) -> ProcessUUID: + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: + self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) return self.__boot(boot_request, this_uuid) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 09d0dfa90..da0459baf 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -21,7 +21,6 @@ from druncschema.process_manager_pb2_grpc import ProcessManagerServicer from druncschema.request_response_pb2 import ( Request, - Response, ResponseFlag, ) from google.rpc import code_pb2 @@ -236,35 +235,34 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: @authentified_and_authorised( action=ActionType.CREATE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def boot(self, request: Request, context: ServicerContext) -> Response: - try: - data = unpack_any(request.data, BootRequest) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - + def boot(self, request: Request, context: ServicerContext) -> ProcessInstanceList: self.log.debug( "{self.name} booting '{data.process_description.metadata.name}' " "from session '{data.process_description.metadata.session}'" ) + try: + data = unpack_any(request.data, BootRequest) + except UnpackingError as e: + return unpack_error_response(self.__class__.__name__, str(e), request.token) + try: response = self._boot_impl(data) - return Response( - name=self.name, - token=None, - data=pack_to_any(response), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=[], - ) except NotImplementedError: - return Response( + return ProcessInstanceList( name=self.name, token=None, - data=pack_to_any(response), + values=[], flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - children=[], ) + return ProcessInstanceList( + name=self.name, + token=None, + values=[response], + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + @abc.abstractmethod def _terminate_impl(self) -> list[ProcessInstance]: raise NotImplementedError diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 8e460e71e..6ce27cd63 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -412,7 +412,7 @@ def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: return ret def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: - self.log.debug(f"{self.name} running _boot_impl") + self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) return self.__boot(boot_request, this_uuid) From 191c6d084617d731cf32b1ff6c2a356041d52f6e Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 25 Jul 2025 17:39:45 +0100 Subject: [PATCH 12/30] More whack-a-mole. --- src/drunc/process_manager/interface/commands.py | 4 ++-- src/drunc/unified_shell/commands.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 8e77a2162..8ebb80f6c 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -67,7 +67,7 @@ def boot( if not result: break log.debug( - f"'{result.data.process_description.metadata.name}' ({result.data.uuid.uuid}) process started" + f"'{result.values[0].process_description.metadata.name}' ({result.values[0].uuid.uuid}) process started" ) except InterruptedCommand: return @@ -144,7 +144,7 @@ def dummy_boot( if not result: break log.debug( - f"'{result.data.process_description.metadata.name}' ({result.data.uuid.uuid}) process started" + f"'{result.values[0].process_description.metadata.name}' ({result.values[0].uuid.uuid}) process started" ) except InterruptedCommand: return diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 7d6dc2e47..a5b662b92 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -50,7 +50,7 @@ def boot( if not result: break log.debug( - f"'{result.data.process_description.metadata.name}' ({result.data.uuid.uuid}) started" + f"'{result.values[0].process_description.metadata.name}' ({result.values[0].uuid.uuid}) started" ) except InterruptedCommand: log.warning("Booting interrupted") From 4a9958c268a9ed2f5231a11d843a8d1d6f30747c Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 14:03:26 +0100 Subject: [PATCH 13/30] Simplify SM client. --- .../session_manager/session_manager_driver.py | 41 ++++++++++++------- src/drunc/utils/shell_utils.py | 17 +++----- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index cd214f172..e02faad57 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -1,10 +1,11 @@ """Driver for the session manager service.""" +import grpc from druncschema.description_pb2 import Description from druncschema.session_manager_pb2 import AllActiveSessions, AllConfigKeys from druncschema.session_manager_pb2_grpc import SessionManagerStub from druncschema.token_pb2 import Token -from grpc import Channel +from google.protobuf.empty_pb2 import Empty from drunc.utils.shell_utils import GRPCDriver @@ -27,17 +28,9 @@ def __init__(self, address: str, token: Token, **kwargs): super().__init__( name="session_manager_driver", address=address, token=token, **kwargs ) - - def create_stub(self, channel: Channel) -> SessionManagerStub: - """Create gRPC stubs for the session manager service. - - Args: - channel: The gRPC channel to use for communication. - - Returns: - An object containing session manager service method stubs. - """ - return SessionManagerStub(channel) + self.address = address + self.channel = grpc.insecure_channel(self.address) + self.stub = SessionManagerStub(self.channel) def describe(self) -> Description: """Describe the session manager service. @@ -45,7 +38,13 @@ def describe(self) -> Description: Returns: A response containing the description of the service. """ - return self.send_command("describe", outformat=Description) + + try: + response = self.stub.describe(Empty()) + except grpc.RpcError as e: + self.__handle_grpc_error(e, "describe") + + return response def list_all_sessions(self) -> AllActiveSessions: """List all active sessions managed by the session manager. @@ -53,7 +52,13 @@ def list_all_sessions(self) -> AllActiveSessions: Returns: A response containing a list of all active sessions. """ - return self.send_command("list_all_sessions", outformat=AllActiveSessions) + + try: + response = self.stub.list_all_sessions(Empty()) + except grpc.RpcError as e: + self.__handle_grpc_error(e, "list_all_sessions") + + return response def list_all_configs(self) -> AllConfigKeys: """List all available configurations in the session manager. @@ -61,4 +66,10 @@ def list_all_configs(self) -> AllConfigKeys: Returns: A response containing all available configuration keys. """ - return self.send_command("list_all_configs", outformat=AllConfigKeys) + + try: + response = self.stub.list_all_configs(Empty()) + except grpc.RpcError as e: + self.__handle_grpc_error(e, "list_all_configs") + + return response diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index df78e20f2..f916ed768 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -94,14 +94,9 @@ def __init__(self, name: str, address: str, token: Token, aio_channel=False): f"You need to provide a valid IP address for the driver. Provided '{address}'" ) - self.address = address - - if aio_channel: - self.channel = grpc.aio.insecure_channel(self.address) - else: - self.channel = grpc.insecure_channel(self.address) - - self.stub = self.create_stub(self.channel) + self._address = address + self._channel = grpc.insecure_channel(self._address) + self._stub = self.create_stub(self._channel) self.token = Token() self.token.CopyFrom(token) @@ -186,7 +181,7 @@ def text(verb="not executed", reason=""): elif response.flag in [ ResponseFlag.NOT_EXECUTED_NOT_IN_CONTROL, ]: - self.log.warn(text()) + self.log.warning(text()) else: self.log.error(text("failed", error_txt)) @@ -207,10 +202,10 @@ def send_command( decode_children=False, timeout: int | float = 60, ): - if not self.stub: + if not self._stub: raise DruncShellException("No stub initialised") - cmd = getattr(self.stub, command) # this throws if the command doesn't exist + cmd = getattr(self._stub, command) # this throws if the command doesn't exist request = self._create_request(data) From 76f2f636c7f5f1d71e12069a83c4fa55804e98b0 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 15:56:21 +0100 Subject: [PATCH 14/30] Add timeout to SM calls. --- .../session_manager/session_manager_driver.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index e02faad57..33d90aae2 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -32,43 +32,52 @@ def __init__(self, address: str, token: Token, **kwargs): self.channel = grpc.insecure_channel(self.address) self.stub = SessionManagerStub(self.channel) - def describe(self) -> Description: + def describe(self, timeout: int | float = 60) -> Description: """Describe the session manager service. + Args: + timeout: The timeout for the gRPC call in seconds. + Returns: A response containing the description of the service. """ try: - response = self.stub.describe(Empty()) + response = self.stub.describe(Empty(), timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "describe") return response - def list_all_sessions(self) -> AllActiveSessions: + def list_all_sessions(self, timeout: int | float = 60) -> AllActiveSessions: """List all active sessions managed by the session manager. + Args: + timeout: The timeout for the gRPC call in seconds. + Returns: A response containing a list of all active sessions. """ try: - response = self.stub.list_all_sessions(Empty()) + response = self.stub.list_all_sessions(Empty(), timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "list_all_sessions") return response - def list_all_configs(self) -> AllConfigKeys: + def list_all_configs(self, timeout: int | float = 60) -> AllConfigKeys: """List all available configurations in the session manager. + Args: + timeout: The timeout for the gRPC call in seconds. + Returns: A response containing all available configuration keys. """ try: - response = self.stub.list_all_configs(Empty()) + response = self.stub.list_all_configs(Empty(), timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "list_all_configs") From 24efa7b1422876a123068bece1b786dd192a5dad Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 18:20:12 +0100 Subject: [PATCH 15/30] Simplify stub creation -- they aren't covariant anyway. --- src/drunc/controller/controller_driver.py | 6 ++-- .../process_manager/process_manager_driver.py | 32 ++++++++++++------- .../session_manager/session_manager_driver.py | 2 -- src/drunc/utils/grpc_utils.py | 15 ++++++++- src/drunc/utils/shell_utils.py | 16 +++------- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index dd88d419b..31153af48 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -15,12 +15,10 @@ class ControllerDriver(GRPCDriver): def __init__(self, address: str, token, **kwargs): - super(ControllerDriver, self).__init__( + super().__init__( name="controller_driver", address=address, token=token, **kwargs ) - - def create_stub(self, channel): - return ControllerStub(channel) + self.stub = ControllerStub(self.channel) def pack_empty_addressed_command(cmd): @wraps(cmd) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 5ab157b18..74dcbc042 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -7,6 +7,7 @@ from collections.abc import Iterator from time import sleep +import grpc from druncschema.description_pb2 import Description from druncschema.process_manager_pb2 import ( BootRequest, @@ -19,12 +20,18 @@ ProcessRestriction, ) from druncschema.process_manager_pb2_grpc import ProcessManagerStub +from druncschema.request_response_pb2 import Request from drunc.connectivity_service.client import ConnectivityServiceClient from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful from drunc.controller.utils import get_segment_lookup_timeout from drunc.exceptions import DruncSetupException, DruncShellException from drunc.process_manager.utils import get_log_path, get_rte_script +from drunc.utils.grpc_utils import ( + copy_token, + rethrow_if_timeout, + rethrow_if_unreachable_server, +) from drunc.utils.shell_utils import GRPCDriver from drunc.utils.utils import ( get_control_type_and_uri_from_connectivity_service, @@ -38,13 +45,10 @@ class ProcessManagerDriver(GRPCDriver): controller_address = "" def __init__(self, address: str, token, **kwargs): - super(ProcessManagerDriver, self).__init__( - name="process_manager.driver", address=address, token=token, **kwargs + super().__init__( + name="process_manager_driver", address=address, token=token, **kwargs ) - self.log.debug("set up process_manager.driver") - - def create_stub(self, channel): - return ProcessManagerStub(channel) + self.stub = ProcessManagerStub(self.channel) def _convert_oks_to_boot_request( self, @@ -442,8 +446,14 @@ def restart( ) def describe(self, timeout: int | float = 60) -> Description: - return self.send_command( - "describe", - outformat=Description, - timeout=timeout, - ) + token = copy_token(self.token) + request = Request(token=token) + + try: + response = self.stub.describe(request, timeout=timeout) + except grpc.RpcError as e: + rethrow_if_unreachable_server(e) + rethrow_if_timeout(e) + raise e + + return response diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index 33d90aae2..9aa002ca2 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -28,8 +28,6 @@ def __init__(self, address: str, token: Token, **kwargs): super().__init__( name="session_manager_driver", address=address, token=token, **kwargs ) - self.address = address - self.channel = grpc.insecure_channel(self.address) self.stub = SessionManagerStub(self.channel) def describe(self, timeout: int | float = 60) -> Description: diff --git a/src/drunc/utils/grpc_utils.py b/src/drunc/utils/grpc_utils.py index a4a01a9f6..46a5698ba 100644 --- a/src/drunc/utils/grpc_utils.py +++ b/src/drunc/utils/grpc_utils.py @@ -1,4 +1,3 @@ - import grpc from druncschema.generic_pb2 import PlainText from druncschema.request_response_pb2 import Response, ResponseFlag @@ -98,3 +97,17 @@ def interrupt_if_unreachable_server(grpc_error): return grpc_error._state.details elif hasattr(grpc_error, "_details"): return grpc_error._details + + +def copy_token(token: Token) -> Token: + """Create a copy of the original token. + + Args: + token: The original token to copy. + + Returns: + A copy of the original token. + """ + token_copy = Token() + token_copy.CopyFrom(token) + return token_copy diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index f916ed768..2582606e8 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -94,16 +94,11 @@ def __init__(self, name: str, address: str, token: Token, aio_channel=False): f"You need to provide a valid IP address for the driver. Provided '{address}'" ) - self._address = address - self._channel = grpc.insecure_channel(self._address) - self._stub = self.create_stub(self._channel) + self.address = address + self.channel = grpc.insecure_channel(self.address) self.token = Token() self.token.CopyFrom(token) - @abc.abstractmethod - def create_stub(self, channel) -> object: - pass - def _create_request(self, payload=None) -> Request: token2 = Token() token2.CopyFrom(self.token) @@ -202,10 +197,10 @@ def send_command( decode_children=False, timeout: int | float = 60, ): - if not self._stub: + if not self.stub: raise DruncShellException("No stub initialised") - cmd = getattr(self._stub, command) # this throws if the command doesn't exist + cmd = getattr(self.stub, command) # this throws if the command doesn't exist request = self._create_request(data) @@ -217,12 +212,9 @@ def send_command( # TODO: TEMP HACK UNTIL UNPACKING IS REMOVED from druncschema.description_pb2 import Description from druncschema.process_manager_pb2 import ProcessInstanceList - from druncschema.session_manager_pb2 import AllActiveSessions, AllConfigKeys new_message_types = ( Description, - AllActiveSessions, - AllConfigKeys, ProcessInstanceList, ) From 0fe8d3d22ede9ce1356fe93ebc25bcb373a80617 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 18:44:46 +0100 Subject: [PATCH 16/30] Use Request for argless commands, until we know what auth we are doing. --- .../session_manager/session_manager_driver.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index 9aa002ca2..64f88f7da 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -2,11 +2,12 @@ import grpc from druncschema.description_pb2 import Description +from druncschema.request_response_pb2 import Request from druncschema.session_manager_pb2 import AllActiveSessions, AllConfigKeys from druncschema.session_manager_pb2_grpc import SessionManagerStub from druncschema.token_pb2 import Token -from google.protobuf.empty_pb2 import Empty +from drunc.utils.grpc_utils import copy_token from drunc.utils.shell_utils import GRPCDriver @@ -39,9 +40,11 @@ def describe(self, timeout: int | float = 60) -> Description: Returns: A response containing the description of the service. """ + token = copy_token(self.token) + request = Request(token=token) try: - response = self.stub.describe(Empty(), timeout=timeout) + response = self.stub.describe(request, timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "describe") @@ -56,9 +59,11 @@ def list_all_sessions(self, timeout: int | float = 60) -> AllActiveSessions: Returns: A response containing a list of all active sessions. """ + token = copy_token(self.token) + request = Request(token=token) try: - response = self.stub.list_all_sessions(Empty(), timeout=timeout) + response = self.stub.list_all_sessions(request, timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "list_all_sessions") @@ -73,9 +78,11 @@ def list_all_configs(self, timeout: int | float = 60) -> AllConfigKeys: Returns: A response containing all available configuration keys. """ + token = copy_token(self.token) + request = Request(token=token) try: - response = self.stub.list_all_configs(Empty(), timeout=timeout) + response = self.stub.list_all_configs(request, timeout=timeout) except grpc.RpcError as e: self.__handle_grpc_error(e, "list_all_configs") From ad32b03f51643da3f355f90f4bd7546fb96e09e3 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 19:30:27 +0100 Subject: [PATCH 17/30] More PM request messages. --- src/drunc/process_manager/process_manager.py | 20 ++---- .../process_manager/process_manager_driver.py | 71 ++++++++++--------- .../session_manager/session_manager_driver.py | 15 ++-- src/drunc/utils/shell_utils.py | 4 +- 4 files changed, 50 insertions(+), 60 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index da0459baf..ac16fe7fe 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -304,17 +304,12 @@ def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step def restart( - self, request: Request, context: ServicerContext + self, request: ProcessQuery, context: ServicerContext ) -> ProcessInstanceList: self.log.debug(f"{self.name} running restart") try: - data = unpack_any(request.data, ProcessQuery) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - - try: - response = self._restart_impl(data) + response = self._restart_impl(request) except NotImplementedError: return ProcessInstanceList( name=self.name, @@ -373,16 +368,13 @@ def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step - def ps(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + def ps( + self, request: ProcessQuery, context: ServicerContext + ) -> ProcessInstanceList: self.log.debug(f"{self.name} running ps") try: - data = unpack_any(request.data, ProcessQuery) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - - try: - response = self._ps_impl(data) + response = self._ps_impl(request) except NotImplementedError: return ProcessInstanceList( name=self.name, diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 74dcbc042..82a01d3d1 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -27,11 +27,7 @@ from drunc.controller.utils import get_segment_lookup_timeout from drunc.exceptions import DruncSetupException, DruncShellException from drunc.process_manager.utils import get_log_path, get_rte_script -from drunc.utils.grpc_utils import ( - copy_token, - rethrow_if_timeout, - rethrow_if_unreachable_server, -) +from drunc.utils.grpc_utils import copy_token from drunc.utils.shell_utils import GRPCDriver from drunc.utils.utils import ( get_control_type_and_uri_from_connectivity_service, @@ -400,60 +396,65 @@ def terminate( ) def kill( - self, query: ProcessQuery, timeout: int | float = 60 + self, request: ProcessQuery, timeout: int | float = 60 ) -> ProcessInstanceList: return self.send_command( "kill", - data=query, + data=request, outformat=ProcessInstanceList, timeout=timeout, ) - def logs(self, req: LogRequest, timeout: int | float = 60) -> LogLines: + def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines: return self.send_command( "logs", - data=req, + data=request, outformat=LogLines, timeout=timeout, ) - def ps(self, query: ProcessQuery, timeout: int | float = 60) -> ProcessInstanceList: - return self.send_command( - "ps", - data=query, - outformat=ProcessInstanceList, - timeout=timeout, - ) + def ps( + self, request: ProcessQuery, timeout: int | float = 60 + ) -> ProcessInstanceList: + request.token.CopyFrom(self.token) + + try: + response = self.stub.ps(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def flush( - self, query: ProcessQuery, timeout: int | float = 60 + self, request: ProcessQuery, timeout: int | float = 60 ) -> ProcessInstanceList: - return self.send_command( - "flush", - data=query, - outformat=ProcessInstanceList, - timeout=timeout, - ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.flush(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def restart( - self, query: ProcessQuery, timeout: int | float = 60 + self, request: ProcessQuery, timeout: int | float = 60 ) -> ProcessInstanceList: - return self.send_command( - "restart", - data=query, - outformat=ProcessInstanceList, - timeout=timeout, - ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.restart(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def describe(self, timeout: int | float = 60) -> Description: - token = copy_token(self.token) - request = Request(token=token) + request = Request(token=copy_token(self.token)) try: response = self.stub.describe(request, timeout=timeout) except grpc.RpcError as e: - rethrow_if_unreachable_server(e) - rethrow_if_timeout(e) - raise e + self.handle_grpc_error(e) return response diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index 64f88f7da..3c47a2a13 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -40,13 +40,12 @@ def describe(self, timeout: int | float = 60) -> Description: Returns: A response containing the description of the service. """ - token = copy_token(self.token) - request = Request(token=token) + request = Request(token=copy_token(self.token)) try: response = self.stub.describe(request, timeout=timeout) except grpc.RpcError as e: - self.__handle_grpc_error(e, "describe") + self.handle_grpc_error(e) return response @@ -59,13 +58,12 @@ def list_all_sessions(self, timeout: int | float = 60) -> AllActiveSessions: Returns: A response containing a list of all active sessions. """ - token = copy_token(self.token) - request = Request(token=token) + request = Request(token=copy_token(self.token)) try: response = self.stub.list_all_sessions(request, timeout=timeout) except grpc.RpcError as e: - self.__handle_grpc_error(e, "list_all_sessions") + self.handle_grpc_error(e) return response @@ -78,12 +76,11 @@ def list_all_configs(self, timeout: int | float = 60) -> AllConfigKeys: Returns: A response containing all available configuration keys. """ - token = copy_token(self.token) - request = Request(token=token) + request = Request(token=copy_token(self.token)) try: response = self.stub.list_all_configs(request, timeout=timeout) except grpc.RpcError as e: - self.__handle_grpc_error(e, "list_all_configs") + self.handle_grpc_error(e) return response diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 2582606e8..40c76909c 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -111,7 +111,7 @@ def _create_request(self, payload=None) -> Request: else: return Request(token=token2) - def __handle_grpc_error(self, error, command): + def handle_grpc_error(self, error): rethrow_if_unreachable_server(error) rethrow_if_timeout(error) raise error @@ -207,7 +207,7 @@ def send_command( try: response = cmd(request, timeout=timeout) except grpc.RpcError as e: - self.__handle_grpc_error(e, command) + self.handle_grpc_error(e) # TODO: TEMP HACK UNTIL UNPACKING IS REMOVED from druncschema.description_pb2 import Description From d1c024913d9a0123a58ec0b6531f4817db35d1f8 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 22:48:19 +0100 Subject: [PATCH 18/30] Only boot to go. --- src/drunc/process_manager/process_manager.py | 33 +++++---------- .../process_manager/process_manager_driver.py | 41 +++++++++++-------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index ac16fe7fe..749f591bb 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -334,16 +334,13 @@ def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def kill(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + def kill( + self, request: ProcessQuery, context: ServicerContext + ) -> ProcessInstanceList: self.log.debug(f"{self.name} running kill") try: - data = unpack_any(request.data, ProcessQuery) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - - try: - response = self._kill_impl(data) + response = self._kill_impl(request) except NotImplementedError: return ProcessInstanceList( name=self.name, @@ -395,16 +392,13 @@ def ps( @authentified_and_authorised( action=ActionType.DELETE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def flush(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + def flush( + self, request: ProcessQuery, context: ServicerContext + ) -> ProcessInstanceList: self.log.debug(f"{self.name} running flush") - try: - data = unpack_any(request.data, ProcessQuery) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - ret = [] - for uuid in self._get_process_uid(data): + for uuid in self._get_process_uid(request): if uuid not in self.boot_request: pu = ProcessUUID(uuid=uuid) pi = ProcessInstance( @@ -487,7 +481,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step - def logs(self, request: Request, context: ServicerContext) -> LogLines: + def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: """Fetch logs for a process. Args: @@ -500,12 +494,7 @@ def logs(self, request: Request, context: ServicerContext) -> LogLines: self.log.debug("Getting logs") try: - data = unpack_any(request.data, LogRequest) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - - try: - response = self._logs_impl(data) + response = self._logs_impl(request) except NotImplementedError: return LogLines( name=self.name, @@ -542,7 +531,7 @@ def _get_process_uid( query: ProcessQuery, in_boot_request: bool = False, order_by: str = "random", - ) -> [str]: + ) -> list[str]: order_by = order_by.lower() if order_by not in ["random", "leaf_first", "root_first"]: raise DruncCommandException(f"Order by '{order_by}' is not supported") diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 82a01d3d1..1f1e14289 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -389,29 +389,36 @@ def terminate( self, timeout: int | float = 60, ) -> ProcessInstanceList: - return self.send_command( - "terminate", - outformat=ProcessInstanceList, - timeout=timeout, - ) + request = Request(token=copy_token(self.token)) + + try: + response = self.stub.terminate(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def kill( self, request: ProcessQuery, timeout: int | float = 60 ) -> ProcessInstanceList: - return self.send_command( - "kill", - data=request, - outformat=ProcessInstanceList, - timeout=timeout, - ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.kill(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines: - return self.send_command( - "logs", - data=request, - outformat=LogLines, - timeout=timeout, - ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.logs(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + return response def ps( self, request: ProcessQuery, timeout: int | float = 60 From bb62053ca06eb6a5615e99585f44c60034f89d8b Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 28 Jul 2025 23:20:29 +0100 Subject: [PATCH 19/30] Boot request message. --- src/drunc/process_manager/process_manager.py | 14 ++----- .../process_manager/process_manager_driver.py | 37 ++++++++++--------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 749f591bb..bfec196e5 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -39,10 +39,7 @@ ) from drunc.utils.configuration import ConfTypes from drunc.utils.grpc_utils import ( - UnpackingError, pack_to_any, - unpack_any, - unpack_error_response, ) from drunc.utils.utils import get_logger, pid_info_str @@ -235,19 +232,16 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: @authentified_and_authorised( action=ActionType.CREATE, system=SystemType.PROCESS_MANAGER ) # 2nd step - def boot(self, request: Request, context: ServicerContext) -> ProcessInstanceList: + def boot( + self, request: BootRequest, context: ServicerContext + ) -> ProcessInstanceList: self.log.debug( "{self.name} booting '{data.process_description.metadata.name}' " "from session '{data.process_description.metadata.session}'" ) try: - data = unpack_any(request.data, BootRequest) - except UnpackingError as e: - return unpack_error_response(self.__class__.__name__, str(e), request.token) - - try: - response = self._boot_impl(data) + response = self._boot_impl(request) except NotImplementedError: return ProcessInstanceList( name=self.name, diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 1f1e14289..86913d95a 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -133,6 +133,7 @@ def _convert_oks_to_boot_request( self.log.debug(f"{name}'s env:\n{env}") breq = BootRequest( + token=copy_token(self.token), process_description=ProcessDescription( metadata=ProcessMetadata( user=user, @@ -204,7 +205,7 @@ def boot( last_boot_on_host_at = {} previous_host = None - for br in self._convert_oks_to_boot_request( + for request in self._convert_oks_to_boot_request( oks_conf=conf_file, user=user, session_dal=session_dal, @@ -214,14 +215,14 @@ def boot( **kwargs, ): if ( - br.process_description.metadata.name + request.process_description.metadata.name not in [app.id for app in session_dal.infrastructure_applications] and csc and not csc.is_ready(timeout=10) ): raise DruncSetupException("Connectivity service is not ready in time") - this_host = next(iter(br.process_restriction.allowed_hosts)) + this_host = next(iter(request.process_restriction.allowed_hosts)) time_diff = time.time() - last_boot_on_host_at.get(this_host, 0) @@ -233,12 +234,13 @@ def boot( previous_host = this_host last_boot_on_host_at[this_host] = time.time() - yield self.send_command( - "boot", - data=br, - outformat=ProcessInstanceList, - timeout=timeout, - ) + + try: + response = self.stub.boot(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + yield response top_controller_name = session_dal.segment.controller.id @@ -359,7 +361,8 @@ def dummy_boot( ) for process in range(n_processes): - breq = BootRequest( + request = BootRequest( + token=copy_token(self.token), process_description=ProcessDescription( metadata=ProcessMetadata( user=user, @@ -376,14 +379,14 @@ def dummy_boot( ), process_restriction=ProcessRestriction(allowed_hosts=["localhost"]), ) - self.log.debug(f"{breq=}\n\n") + self.log.debug(f"{request=}\n\n") - yield self.send_command( - "boot", - data=breq, - outformat=ProcessInstanceList, - timeout=timeout, - ) + try: + response = self.stub.boot(request, timeout=timeout) + except grpc.RpcError as e: + self.handle_grpc_error(e) + + yield response def terminate( self, From ed2b340e42a28abc6e1faa6baef674c38730fffe Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 29 Jul 2025 10:18:42 +0100 Subject: [PATCH 20/30] Simplify handle_response. --- src/drunc/utils/shell_utils.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 40c76909c..bda5f4aa3 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -131,15 +131,6 @@ def handle_response(self, response, command, outformat): self.log.error(f"Error unpacking data: {e}") dr.data = response.data - for c_response in response.children: - try: - dr.children.append( - self.handle_response(c_response, command, outformat) - ) - except DruncServerSideError as e: - self.log.error(f"Exception thrown from child: {e}") - return dr - else: def text(verb="not executed", reason=""): @@ -154,7 +145,6 @@ def text(verb="not executed", reason=""): if response.data.Is(Stacktrace.DESCRIPTOR): stack = unpack_any(response.data, Stacktrace) dr.data = stack - # stack_txt = 'Stacktrace [bold red]on remote server![/bold red]\n' # Temporary - bold doesn't work stack_txt = "Stacktrace on remote server!\n" last_one = "" @@ -180,14 +170,13 @@ def text(verb="not executed", reason=""): else: self.log.error(text("failed", error_txt)) - for c_response in response.children: - try: - dr.children.append( - self.handle_response(c_response, command, outformat) - ) - except DruncServerSideError as e: - self.log.error(f"Exception thrown from child: {e}") - return dr + for c_response in response.children: + try: + dr.children.append(self.handle_response(c_response, command, outformat)) + except DruncServerSideError as e: + self.log.error(f"Exception thrown from child: {e}") + + return dr def send_command( self, @@ -197,9 +186,6 @@ def send_command( decode_children=False, timeout: int | float = 60, ): - if not self.stub: - raise DruncShellException("No stub initialised") - cmd = getattr(self.stub, command) # this throws if the command doesn't exist request = self._create_request(data) From 832a529bac094d7c447c012c0c473f263065558a Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 29 Jul 2025 14:49:01 +0100 Subject: [PATCH 21/30] Make handle_grpc_error a global fn for now. --- .../process_manager/process_manager_driver.py | 20 +++++++++---------- .../session_manager/session_manager_driver.py | 8 ++++---- src/drunc/utils/grpc_utils.py | 13 ++++++++++++ src/drunc/utils/shell_utils.py | 14 ++----------- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 86913d95a..359d8132f 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -27,7 +27,7 @@ from drunc.controller.utils import get_segment_lookup_timeout from drunc.exceptions import DruncSetupException, DruncShellException from drunc.process_manager.utils import get_log_path, get_rte_script -from drunc.utils.grpc_utils import copy_token +from drunc.utils.grpc_utils import copy_token, handle_grpc_error from drunc.utils.shell_utils import GRPCDriver from drunc.utils.utils import ( get_control_type_and_uri_from_connectivity_service, @@ -238,7 +238,7 @@ def boot( try: response = self.stub.boot(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) yield response @@ -384,7 +384,7 @@ def dummy_boot( try: response = self.stub.boot(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) yield response @@ -397,7 +397,7 @@ def terminate( try: response = self.stub.terminate(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -409,7 +409,7 @@ def kill( try: response = self.stub.kill(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -419,7 +419,7 @@ def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines: try: response = self.stub.logs(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -431,7 +431,7 @@ def ps( try: response = self.stub.ps(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -443,7 +443,7 @@ def flush( try: response = self.stub.flush(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -455,7 +455,7 @@ def restart( try: response = self.stub.restart(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -465,6 +465,6 @@ def describe(self, timeout: int | float = 60) -> Description: try: response = self.stub.describe(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response diff --git a/src/drunc/session_manager/session_manager_driver.py b/src/drunc/session_manager/session_manager_driver.py index 3c47a2a13..a52a6c7ab 100644 --- a/src/drunc/session_manager/session_manager_driver.py +++ b/src/drunc/session_manager/session_manager_driver.py @@ -7,7 +7,7 @@ from druncschema.session_manager_pb2_grpc import SessionManagerStub from druncschema.token_pb2 import Token -from drunc.utils.grpc_utils import copy_token +from drunc.utils.grpc_utils import copy_token, handle_grpc_error from drunc.utils.shell_utils import GRPCDriver @@ -45,7 +45,7 @@ def describe(self, timeout: int | float = 60) -> Description: try: response = self.stub.describe(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -63,7 +63,7 @@ def list_all_sessions(self, timeout: int | float = 60) -> AllActiveSessions: try: response = self.stub.list_all_sessions(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response @@ -81,6 +81,6 @@ def list_all_configs(self, timeout: int | float = 60) -> AllConfigKeys: try: response = self.stub.list_all_configs(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) return response diff --git a/src/drunc/utils/grpc_utils.py b/src/drunc/utils/grpc_utils.py index 46a5698ba..9aea97b2d 100644 --- a/src/drunc/utils/grpc_utils.py +++ b/src/drunc/utils/grpc_utils.py @@ -1,3 +1,5 @@ +from typing import NoReturn + import grpc from druncschema.generic_pb2 import PlainText from druncschema.request_response_pb2 import Response, ResponseFlag @@ -91,6 +93,17 @@ def rethrow_if_timeout(grpc_error): raise ServerTimeout(grpc_error._state.details) from grpc_error +def handle_grpc_error(error: grpc.RpcError) -> NoReturn: + """Handle gRPC errors by rethrowing them with appropriate context. + + Args: + error: The gRPC error to handle. + """ + rethrow_if_unreachable_server(error) + rethrow_if_timeout(error) + raise error + + def interrupt_if_unreachable_server(grpc_error): if not server_is_reachable(grpc_error): if hasattr(grpc_error, "_state"): diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index bda5f4aa3..bc1ca9ea4 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -15,12 +15,7 @@ DruncSetupException, DruncShellException, ) -from drunc.utils.grpc_utils import ( - UnpackingError, - rethrow_if_timeout, - rethrow_if_unreachable_server, - unpack_any, -) +from drunc.utils.grpc_utils import UnpackingError, handle_grpc_error, unpack_any from drunc.utils.utils import get_logger @@ -111,11 +106,6 @@ def _create_request(self, payload=None) -> Request: else: return Request(token=token2) - def handle_grpc_error(self, error): - rethrow_if_unreachable_server(error) - rethrow_if_timeout(error) - raise error - def handle_response(self, response, command, outformat): dr = DecodedResponse( name=response.name, @@ -193,7 +183,7 @@ def send_command( try: response = cmd(request, timeout=timeout) except grpc.RpcError as e: - self.handle_grpc_error(e) + handle_grpc_error(e) # TODO: TEMP HACK UNTIL UNPACKING IS REMOVED from druncschema.description_pb2 import Description From 8677c42398fb2a270cdb197b6b27837a2aba0f30 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 29 Jul 2025 16:37:36 +0100 Subject: [PATCH 22/30] Fix the temp hack. --- src/drunc/utils/shell_utils.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index bc1ca9ea4..681704735 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -173,7 +173,6 @@ def send_command( command: str, data=None, outformat=None, - decode_children=False, timeout: int | float = 60, ): cmd = getattr(self.stub, command) # this throws if the command doesn't exist @@ -187,14 +186,8 @@ def send_command( # TODO: TEMP HACK UNTIL UNPACKING IS REMOVED from druncschema.description_pb2 import Description - from druncschema.process_manager_pb2 import ProcessInstanceList - new_message_types = ( - Description, - ProcessInstanceList, - ) - - if isinstance(response, new_message_types): + if isinstance(response, Description): return response return self.handle_response(response, command, outformat) From 8cb804008d3f950bd36b0ae1f35e811f7132c4e1 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Wed, 30 Jul 2025 17:09:00 +0100 Subject: [PATCH 23/30] More PM refactoring. --- .../process_manager/k8s_process_manager.py | 30 +++++++--- src/drunc/process_manager/process_manager.py | 45 ++++----------- .../process_manager/ssh_process_manager.py | 56 ++++++++++++++----- 3 files changed, 76 insertions(+), 55 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index ef892c4b2..e528de2cd 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -16,6 +16,7 @@ ProcessRestriction, ProcessUUID, ) +from druncschema.request_response_pb2 import ResponseFlag from kubernetes import client, config from drunc.exceptions import DruncCommandException, DruncException @@ -381,10 +382,16 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: ).split("\n") ] - def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) - return self.__boot(boot_request, this_uuid) + process = self.__boot(boot_request, this_uuid) + return ProcessInstanceList( + name=self.name, + token=None, + values=[process], + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: session = boot_request.process_description.metadata.session @@ -415,7 +422,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: def _ps_impl( self, query: ProcessQuery, in_boot_request: bool = False - ) -> list[ProcessInstance]: + ) -> ProcessInstanceList: ret = [] for proc_uuid in self._get_process_uid(query): podname = self.boot_request[proc_uuid].process_description.metadata.name @@ -425,9 +432,14 @@ def _ps_impl( ret.append(self._get_pi(proc_uuid, podname, session, return_code)) - return ret + return ProcessInstanceList( + name=self.name, + token=None, + values=ret, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: ret = [] uuids = self._get_process_uid(query, in_boot_request=True) uuid = self._ensure_one_process(uuids, in_boot_request=True) @@ -438,7 +450,6 @@ def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: self._kill_pod(podname, session) - same_uuid_br = [] same_uuid_br = BootRequest() same_uuid_br.CopyFrom(self.boot_request[uuid]) same_uuid = uuid @@ -452,7 +463,12 @@ def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: del same_uuid_br del same_uuid - return ret + return ProcessInstanceList( + name=self.name, + token=None, + values=ret, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) # # ORDER MATTERS! # @broadcasted # outer most wrapper 1st step diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index bfec196e5..8d6b493b7 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -224,7 +224,7 @@ def interrupt_with_exception(self, *args, **kwargs): ) @abc.abstractmethod - def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: raise NotImplementedError # ORDER MATTERS! @@ -250,15 +250,10 @@ def boot( flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) - return ProcessInstanceList( - name=self.name, - token=None, - values=[response], - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) + return response @abc.abstractmethod - def _terminate_impl(self) -> list[ProcessInstance]: + def _terminate_impl(self) -> ProcessInstanceList: raise NotImplementedError # ORDER MATTERS! @@ -281,15 +276,10 @@ def terminate( flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) - return ProcessInstanceList( - name=self.name, - token=None, - values=response, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) + return response @abc.abstractmethod - def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: raise NotImplementedError # ORDER MATTERS! @@ -312,15 +302,10 @@ def restart( flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) - return ProcessInstanceList( - name=self.name, - token=None, - values=response, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) + return response @abc.abstractmethod - def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: raise NotImplementedError # ORDER MATTERS! @@ -343,15 +328,10 @@ def kill( flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) - return ProcessInstanceList( - name=self.name, - token=None, - values=response, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) + return response @abc.abstractmethod - def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: raise NotImplementedError # ORDER MATTERS! @@ -374,12 +354,7 @@ def ps( flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) - return ProcessInstanceList( - name=self.name, - token=None, - values=response, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) + return response # ORDER MATTERS! @broadcasted # outer most wrapper 1st step diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 6ce27cd63..d23e2a152 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -14,6 +14,7 @@ LogRequest, ProcessDescription, ProcessInstance, + ProcessInstanceList, ProcessQuery, ProcessRestriction, ProcessUUID, @@ -119,7 +120,7 @@ def kill_processes(self, uuids: list) -> list[ProcessInstance]: return ret - def _terminate_impl(self) -> list[ProcessInstance]: + def _terminate_impl(self) -> ProcessInstanceList: self.log.info("Terminating") if self.process_store: @@ -127,10 +128,17 @@ def _terminate_impl(self) -> list[ProcessInstance]: uuids = self._get_process_uid( query=ProcessQuery(names=[".*"]), order_by="leaf_first" ) - return self.kill_processes(uuids) + processes = self.kill_processes(uuids) else: self.log.info("No known process to kill before exiting") - return [] + processes = [] + + return ProcessInstanceList( + name=self.name, + token=None, + values=processes, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) def _logs_impl(self, log_request: LogRequest) -> LogLines: self.log.debug(f"Retrieving logs for {log_request.query}") @@ -369,7 +377,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: return pi - def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: ret = [] for proc_uuid in self._get_process_uid(query): @@ -409,19 +417,29 @@ def _ps_impl(self, query: ProcessQuery) -> list[ProcessInstance]: ) ret += [pi] - return ret + return ProcessInstanceList( + name=self.name, + token=None, + values=ret, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - def _boot_impl(self, boot_request: BootRequest) -> ProcessInstance: + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) - return self.__boot(boot_request, this_uuid) + process = self.__boot(boot_request, this_uuid) + return ProcessInstanceList( + name=self.name, + token=None, + values=[process], + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: self.log.info(f"{self.name} restarting {query.names} in session {self.session}") uuids = self._get_process_uid(query, in_boot_request=True) uuid = self._ensure_one_process(uuids, in_boot_request=True) - same_uuid_br = [] same_uuid_br = BootRequest() same_uuid_br.CopyFrom(self.boot_request[uuid]) same_uuid = uuid @@ -440,14 +458,26 @@ def _restart_impl(self, query: ProcessQuery) -> list[ProcessInstance]: del same_uuid_br del same_uuid - return ret + return ProcessInstanceList( + name=self.name, + token=None, + values=ret, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - def _kill_impl(self, query: ProcessQuery) -> list[ProcessInstance]: + def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: self.log.info(f"{self.name} killing {query.names} in session {self.session}") if self.process_store: uuids = self._get_process_uid(query, order_by="leaf_first") - return self.kill_processes(uuids) + processes = self.kill_processes(uuids) else: self.log.info("No known process to kill before exiting") - return [] + processes = [] + + return ProcessInstanceList( + name=self.name, + token=None, + values=processes, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) From 6c01427c7e4fa2ac10b929efc4dfb383b387534d Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Wed, 30 Jul 2025 17:22:09 +0100 Subject: [PATCH 24/30] One last PM refactor for now. --- .../process_manager/ssh_process_manager.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index d23e2a152..305b0e43d 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -72,7 +72,7 @@ def __init__(self, configuration, **kwargs): self.ssh = sh.Command("/usr/bin/ssh") - def kill_processes(self, uuids: list) -> list[ProcessInstance]: + def kill_processes(self, uuids: list) -> ProcessInstanceList: ret = [] for proc_uuid in uuids: process = self.process_store[proc_uuid] @@ -118,7 +118,12 @@ def kill_processes(self, uuids: list) -> list[ProcessInstance]: ] del self.process_store[proc_uuid] - return ret + return ProcessInstanceList( + name=self.name, + token=None, + values=ret, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) def _terminate_impl(self) -> ProcessInstanceList: self.log.info("Terminating") @@ -128,15 +133,13 @@ def _terminate_impl(self) -> ProcessInstanceList: uuids = self._get_process_uid( query=ProcessQuery(names=[".*"]), order_by="leaf_first" ) - processes = self.kill_processes(uuids) - else: - self.log.info("No known process to kill before exiting") - processes = [] + return self.kill_processes(uuids) + self.log.info("No known process to kill before exiting") return ProcessInstanceList( name=self.name, token=None, - values=processes, + values=[], flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) @@ -470,14 +473,12 @@ def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: if self.process_store: uuids = self._get_process_uid(query, order_by="leaf_first") - processes = self.kill_processes(uuids) - else: - self.log.info("No known process to kill before exiting") - processes = [] + return self.kill_processes(uuids) + self.log.info("No known process to kill before exiting") return ProcessInstanceList( name=self.name, token=None, - values=processes, + values=[], flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) From b304a440cafed2a0ac82175fdde034bea3526dfe Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 31 Jul 2025 13:20:46 +0100 Subject: [PATCH 25/30] Few last typos. --- src/drunc/process_manager/interface/commands.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 8ebb80f6c..ccf6b8f63 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -171,7 +171,7 @@ def terminate(obj: ProcessManagerContext) -> None: def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None: log = get_logger("process_manager.shell") log.debug(f"Killing with query {query}") - result = obj.get_driver("process_manager").kill(query=query) + result = obj.get_driver("process_manager").kill(query) if not result: return obj.print( @@ -187,7 +187,7 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None: def flush(obj: ProcessManagerContext, query: ProcessQuery) -> None: log = get_logger("process_manager.shell") log.debug(f"Flushing with query {query}") - result = obj.get_driver("process_manager").flush(query=query) + result = obj.get_driver("process_manager").flush(query) if not result: return obj.print( @@ -247,7 +247,7 @@ def logs( def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: log = get_logger("process_manager.shell") log.debug(f"Restarting with query {query}") - obj.get_driver("process_manager").restart(query=query) + obj.get_driver("process_manager").restart(query) @click.command("ps") @@ -264,7 +264,7 @@ def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: def ps(obj: ProcessManagerContext, query: ProcessQuery, long_format: bool) -> None: log = get_logger("process_manager.shell") log.debug(f"Running ps with query {query}") - results = obj.get_driver("process_manager").ps(query=query) + results = obj.get_driver("process_manager").ps(query) if not results: return obj.print( From 1863a0a29428f48f6d6fe3dd4574b73592691bde Mon Sep 17 00:00:00 2001 From: wanyunSu Date: Thu, 31 Jul 2025 14:51:42 +0200 Subject: [PATCH 26/30] fix for the PM publish function --- src/drunc/process_manager/process_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 8d6b493b7..c52acfd21 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -169,16 +169,16 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0): n_running = sum( 1 - for process in results + for process in results.values if process.status_code == ProcessInstance.StatusCode.RUNNING ) n_dead = sum( 1 - for process in results + for process in results.values if process.status_code == ProcessInstance.StatusCode.DEAD ) n_session = len( - {process.process_description.metadata.session for process in results} + {process.process_description.metadata.session for process in results.values} ) self.opmon_publisher.publish( message=ProcessStatus( From b8595379d4970936903b1b8daf51c77a13655c9e Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 31 Jul 2025 15:06:05 +0100 Subject: [PATCH 27/30] Another sneaky one... --- src/drunc/unified_shell/shell_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/unified_shell/shell_utils.py b/src/drunc/unified_shell/shell_utils.py index 6c2e2282f..120f44fbe 100644 --- a/src/drunc/unified_shell/shell_utils.py +++ b/src/drunc/unified_shell/shell_utils.py @@ -19,7 +19,7 @@ def run_fsm_sequence(sequence_commands, cmd_to_options_and_args, ctx, obj, **kwa if command == "boot": pmd = obj.get_driver("process_manager", quiet_fail=True) process_list = pmd.ps(ProcessQuery(names=[".*"])) - if not process_list.data.values: # We haven't started anything yet + if not process_list.values: # We haven't started anything yet accepted_command.append("boot") if cd: accepted_command_raw = cd.describe_fsm() From 85ff803cfc70e2416f5b1888701e74bafd77623a Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 1 Aug 2025 14:01:30 +0200 Subject: [PATCH 28/30] Residual _logs from ruff checking --- src/drunc/fsm/actions/usvc_elisa_logbook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drunc/fsm/actions/usvc_elisa_logbook.py b/src/drunc/fsm/actions/usvc_elisa_logbook.py index 6e4df8e8c..c01f20564 100644 --- a/src/drunc/fsm/actions/usvc_elisa_logbook.py +++ b/src/drunc/fsm/actions/usvc_elisa_logbook.py @@ -43,13 +43,13 @@ def __init__(self, configuration): if len(configuration.parameters) > 0: elisa_hardware_tmp = configuration.parameters[0].value if elisa_hardware_tmp not in dotdrunc["elisa_configuration"]: - self._log.error( + self.log.error( f"The ELisA logbook you specified in your configuration '{elisa_hardware_tmp}' was not found in '~/.drunc.json'. I will use the first one in your ~/.drunc.json. You will log on the ELisA logbook '{elisa_hardware}'. Contact Pierre Lasorak for help." ) else: elisa_hardware = elisa_hardware_tmp else: - self._log.error( + self.log.error( f"ELisA logbook not specified in the configuration, using the first one in from your '~/.drunc.json'. You will log on the ELisA logbook '{elisa_hardware}'. Contact Pierre Lasorak for help." ) From 1d6ce1b74c85cea3b3fcab9deeb0822e0b693c0f Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 5 Aug 2025 13:44:14 +0100 Subject: [PATCH 29/30] Fix controller driver being deleted, and check for clobebring. --- src/drunc/process_manager/interface/commands.py | 4 ---- src/drunc/unified_shell/context.py | 5 ++++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index ccf6b8f63..9d24221f3 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -162,8 +162,6 @@ def terminate(obj: ProcessManagerContext) -> None: tabulate_process_instance_list(result, "Terminated process", False) ) # rich tables require console printing - obj.delete_driver("controller") - @click.command("kill") @add_query_options(at_least_one=True) @@ -178,8 +176,6 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None: tabulate_process_instance_list(result, "Killed process", False) ) # rich tables require console printing - obj.delete_driver("controller") - @click.command("flush") @add_query_options(at_least_one=False, all_processes_by_default=True) diff --git a/src/drunc/unified_shell/context.py b/src/drunc/unified_shell/context.py index eea95eb3a..1ead05ec4 100644 --- a/src/drunc/unified_shell/context.py +++ b/src/drunc/unified_shell/context.py @@ -52,12 +52,15 @@ def set_controller_driver(self, address_controller, **kwargs) -> None: del self._drivers["controller"] return - self._drivers["controller"] = ControllerDriver( + driver = ControllerDriver( self.address_controller, self._token, aio_channel=False, ) + # This will raise an exception if the driver already exists + self.set_driver("controller", driver) + def create_token(self, **kwargs) -> Token: from drunc.utils.shell_utils import create_dummy_token_from_uname From dd05586096c2272c2d4d0053c3d3541a3bbb73fa Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 5 Aug 2025 15:41:14 +0100 Subject: [PATCH 30/30] create_stub no longer exists. --- src/drunc/utils/shell_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index db8f7fd17..d02957cd2 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -91,7 +91,6 @@ def __init__(self, name: str, address: str, token: Token): self.address = address self.channel = grpc.insecure_channel(self.address) - self.stub = self.create_stub(self.channel) self.token = Token() self.token.CopyFrom(token)