diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index 2b5f59670..08e5cd209 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -1,7 +1,6 @@ from abc import ABC, abstractmethod from druncschema.controller_pb2 import ( - AddressedCommand, DescribeFSMResponse, DescribeResponse, ExcludeResponse, @@ -11,22 +10,18 @@ IncludeResponse, RecomputeStatusResponse, StatusResponse, + SurrenderControlResponse, + TakeControlResponse, + ToErrorResponse, + WhoIsInChargeResponse, ) -from druncschema.request_response_pb2 import Response -from druncschema.token_pb2 import Token -from drunc.exceptions import DruncSetupException from drunc.utils.utils import ( ControlType, get_logger, ) -class ChildInterfaceTechnologyUnknown(DruncSetupException): - def __init__(self, t, name): - super().__init__(f"The type {t} is not supported for the ChildNode {name}") - - class ChildNode(ABC): def __init__(self, name: str, node_type: ControlType): self.log = get_logger(f"controller.{name}-child-node") @@ -46,15 +41,6 @@ def get_endpoint(self) -> str: def terminate(self) -> None: pass - @abstractmethod - def propagate_command( - self, - command: str, - request: AddressedCommand, - token: Token | None, - ) -> Response: - pass - @abstractmethod def status( self, @@ -129,3 +115,39 @@ def recompute_status( execute_on_all_subsequent_children_in_path: bool = True, ) -> RecomputeStatusResponse: pass + + @abstractmethod + def take_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> TakeControlResponse: + pass + + @abstractmethod + def surrender_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> SurrenderControlResponse: + pass + + @abstractmethod + def who_is_in_charge( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> WhoIsInChargeResponse: + pass + + @abstractmethod + def to_error( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> ToErrorResponse: + pass diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index d4b304a9d..167c468e6 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -4,7 +4,6 @@ import grpc from druncschema.controller_pb2 import ( - AddressedCommand, DescribeFSMRequest, DescribeFSMResponse, DescribeRequest, @@ -22,10 +21,17 @@ RecomputeStatusResponse, StatusRequest, StatusResponse, + SurrenderControlRequest, + SurrenderControlResponse, + TakeControlRequest, + TakeControlResponse, + ToErrorRequest, + ToErrorResponse, + WhoIsInChargeRequest, + WhoIsInChargeResponse, ) from druncschema.controller_pb2_grpc import ControllerStub from druncschema.generic_pb2 import PlainText, Stacktrace -from druncschema.request_response_pb2 import Request, Response from druncschema.token_pb2 import Token from grpc_status import rpc_status @@ -191,30 +197,6 @@ def start_listening(self, bdesc): ) ) - def propagate_command( - self, - command: str, - request: AddressedCommand, - token: Token | None, - ) -> Response: - packed_request = Request(token=token) - packed_request.data.Pack(request) - - cmd = getattr(self.stub, command) - - try: - response = cmd(packed_request) - except grpc.RpcError as error: - try: - self.handle_child_grpc_error(error) - except ServerUnreachable: - self.log.info( - f"Connection to {self.name} at {self.uri} failed, attempting to reconnect..." - ) - response = self._attempt_reconnection(lambda: cmd(packed_request)) - - return response - def status( self, target: str = "", @@ -437,7 +419,7 @@ def recompute_status( self.handle_child_grpc_error(e) except ServerUnreachable: self.log.info( - f"Connection to {self.name} at {self.uri} failed during recompute_status check, attempting to reconnect..." + f"Connection to {self.name} at {self.uri} failed during recompute_status, attempting to reconnect..." ) response = self._attempt_reconnection( lambda: self.stub.recompute_status(request) @@ -445,6 +427,118 @@ def recompute_status( return response + def take_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> TakeControlResponse: + request = TakeControlRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + + try: + response = self.stub.take_control(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.info( + f"Connection to {self.name} at {self.uri} failed during take_control, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.take_control(request) + ) + + return response + + def surrender_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> SurrenderControlResponse: + request = SurrenderControlRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + + try: + response = self.stub.surrender_control(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.info( + f"Connection to {self.name} at {self.uri} failed during surrender_control, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.surrender_control(request) + ) + + return response + + def who_is_in_charge( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> WhoIsInChargeResponse: + request = WhoIsInChargeRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + + try: + response = self.stub.who_is_in_charge(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.info( + f"Connection to {self.name} at {self.uri} failed during who_is_in_charge, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.who_is_in_charge(request) + ) + + return response + + def to_error( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> ToErrorResponse: + request = ToErrorRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + + try: + response = self.stub.to_error(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.info( + f"Connection to {self.name} at {self.uri} failed during to_error, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.to_error(request) + ) + + return response + def handle_child_grpc_error(self, error: grpc.RpcError) -> NoReturn: """Handle gRPC errors from sending commands to the child controller. diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index 1bd1b58e8..89e1223a4 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -10,7 +10,6 @@ import requests import socks from druncschema.controller_pb2 import ( - AddressedCommand, DescribeFSMResponse, DescribeResponse, ExcludeResponse, @@ -22,10 +21,13 @@ RecomputeStatusResponse, Status, StatusResponse, + SurrenderControlResponse, + TakeControlResponse, + ToErrorResponse, + WhoIsInChargeResponse, ) from druncschema.description_pb2 import Description -from druncschema.request_response_pb2 import Response, ResponseFlag -from druncschema.token_pb2 import Token +from druncschema.request_response_pb2 import ResponseFlag from flask import Flask, request from flask_restful import Api @@ -65,15 +67,8 @@ def run(self) -> None: self.log.debug("ResponseDispatcher starting to run") while True: - # self.log.debug(f'starting to iterating: {self.listener.queue.qsize()}') - # self.log.debug(f'Queue pointer {self.listener.queue}') - # try: r = self.listener.queue.get() self.log.debug(f"ResponseDispatcher got the following answer: {r}") - # except: - # self.log.debug(f'ResponseDispatcher nothing') - # continue - if r == self.STOP: self.log.debug("ResponseDispatcher STOP") break @@ -414,18 +409,6 @@ def get_endpoint(self) -> str: def terminate(self) -> None: pass - def propagate_command( - self, - command: str, - request: AddressedCommand, - token: Token | None, - ) -> Response: - self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") - return Response( - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - ) - def status( self, target: str = "", @@ -700,3 +683,52 @@ def recompute_status( name=self.name, flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ) + + def take_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> TakeControlResponse: + return TakeControlResponse( + token=None, + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + + def surrender_control( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> SurrenderControlResponse: + return SurrenderControlResponse( + token=None, + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + + def who_is_in_charge( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> WhoIsInChargeResponse: + return WhoIsInChargeResponse( + token=None, + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + + def to_error( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> ToErrorResponse: + self.state.to_error() + return ToErrorResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 3463911cc..7c3a15088 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1,16 +1,12 @@ import multiprocessing -import re import threading import time -import traceback from concurrent.futures import ThreadPoolExecutor, as_completed -from functools import wraps from typing import Callable, List, TypeVar from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.controller_pb2 import ( - AddressedCommand, DescribeFSMRequest, DescribeFSMResponse, DescribeRequest, @@ -29,14 +25,20 @@ RecomputeStatusResponse, StatusRequest, StatusResponse, + SurrenderControlRequest, + SurrenderControlResponse, + TakeControlRequest, + TakeControlResponse, + ToErrorRequest, + ToErrorResponse, + WhoIsInChargeRequest, + WhoIsInChargeResponse, ) from druncschema.controller_pb2_grpc import ControllerServicer from druncschema.description_pb2 import Description -from druncschema.generic_pb2 import PlainText, Stacktrace from druncschema.opmon.generic_pb2 import RunInfo -from druncschema.request_response_pb2 import Response, ResponseFlag +from druncschema.request_response_pb2 import ResponseFlag from druncschema.token_pb2 import Token -from google.protobuf.any_pb2 import Any from grpc import ServicerContext from drunc.authoriser.configuration import DummyAuthoriserConfHandler @@ -56,7 +58,6 @@ get_detector_name, get_status_message, ) -from drunc.exceptions import DruncCommandException, DruncException from drunc.fsm.actions.utils import get_dotdrunc_json from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.exceptions import ( @@ -64,157 +65,11 @@ DotDruncJsonNotFound, ) from drunc.fsm.utils import convert_fsm_transition -from drunc.utils.grpc_utils import UnpackingError, pack_to_any, unpack_any from drunc.utils.utils import get_logger T = TypeVar("T") -def OLD_address_command( - obj, - command_name, - command_data, - target, - execute_along_path, - execute_on_all_subsequent_children_in_path, -): - log = get_logger("controller.OLD_address_command") - - ret = {} - children_names = [c.name for c in obj.children_nodes] - - start_with_slash = target.startswith("/") - target_ = target[:] - if start_with_slash: - target_ = target[1:] - - if target_ == "": - if execute_on_all_subsequent_children_in_path: - for child in children_names: - ret[child] = AddressedCommand( - command_name=command_name, - command_data=command_data, - target=child, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ) - return ret - - target_path = target_.split("/") - if start_with_slash and target_path[0] != obj.name: - raise DruncCommandException(f"Target '{target_}' is not matching '{obj.name}'") - - if target_path[0] == obj.name: - target_path.pop(0) - - if target_path == []: - if execute_on_all_subsequent_children_in_path: - for child in children_names: - ret[child] = AddressedCommand( - command_name=command_name, - command_data=command_data, - target=child, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ) - return ret - - target_name = target_path[0] - - for child in children_names: - if re.match(target_name, child): - new_target_path = child - if len(target_path) > 1: - new_target_path = "/".join([new_target_path] + target_path[1:]) - ret[child] = AddressedCommand( - command_name=command_name, - command_data=command_data, - target=new_target_path, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ) - - if ret == {}: - log.info(f"Target '{target}' not found in children of '{obj.name}'") - - return ret - - -def OLD_unpack_addressed_command_to(data_type=None): - def decor(cmd): - command_name = cmd.__name__ - logger = get_logger(f"controller.upack_add'ed_cmd.{command_name}") - - @wraps(cmd) - def wrap(obj, request, context): - try: - command = unpack_any(request.data, AddressedCommand) - except UnpackingError as e: - logger.exception(e) - return Response( - name=obj.name, - token=None, - data=pack_to_any(PlainText(text=str(e))), - flag=ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT, - children=[], - ) - - try: - addressed_commands = OLD_address_command( - obj=obj, - command_name=command_name, - command_data=command.command_data, - target=command.target, - execute_along_path=command.execute_along_path, - execute_on_all_subsequent_children_in_path=command.execute_on_all_subsequent_children_in_path, - ) - logger.debug(f"Addressed commands: {addressed_commands}") - except DruncCommandException as e: - logger.exception(e) - return Response( - name=obj.name, - token=None, - data=pack_to_any(PlainText(text=str(e))), - flag=ResponseFlag.FAILED, - children=[], - ) - - payload = None - if data_type is not None: - try: - payload = unpack_any(command.command_data, data_type) - except UnpackingError as e: - logger.exception(e) - return Response( - name=obj.name, - token=None, - data=pack_to_any(PlainText(text=str(e))), - flag=ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT, - children=[], - ) - - execute_on_self = ( - command.target == obj.name - or command.target == "" - or command.target == "/" - or command.execute_along_path - ) - - kwargs = { - "addressed_commands": addressed_commands, - "execute_on_self": execute_on_self, - "token": request.token, - } - if payload is not None: - kwargs["payload"] = payload - - return cmd(obj, **kwargs) - - return wrap - - return decor - - class Controller(ControllerServicer): children_nodes: List[ChildNode] = [] @@ -348,15 +203,7 @@ def init_controller(self) -> None: if child.name in bad_children: continue log_init_controller.info(f"Taking control of {child.name}") - request = AddressedCommand( - token=self.actor.get_token(), - command_name="take_control", - command_data=None, - target=child.name, - execute_along_path=True, - execute_on_all_subsequent_children_in_path=True, - ) - child.propagate_command("take_control", request, self.actor.get_token()) + child.take_control(execute_on_all_subsequent_children_in_path=True) interval_s = getattr(self.configuration.data, "interval_s", 10.0) @@ -538,132 +385,6 @@ def terminate(self): def __del__(self): self.terminate() - def OLD_propagate_to_all_children( - self, - command_name: str, - token: Token, - command_data: Any = None, - only_included: bool = True, - ): - children_to_execute = [ - cn.name for cn in self.children_nodes if not only_included or cn.included - ] - - addressed_commands = { - cn: AddressedCommand( - command_name=command_name, - command_data=command_data, - target=cn, - execute_along_path=True, - execute_on_all_subsequent_children_in_path=True, - ) - for cn in children_to_execute - } - - return self.OLD_propagate_to_children( - command_name, - addressed_commands, - token, - ) - - def OLD_propagate_to_children( - self, - command_name: str, - addressed_commands: dict[str, AddressedCommand], - token: Token, - ): - self.log.debug(f"Propagating {command_name} to children") - response_children: list[Response] = [] - response_lock = threading.Lock() - - def propagate_to_child( - child_name, - command_name, - command_data, - token, - response_lock, - response_children, - ): - child = next( - (cn for cn in self.children_nodes if cn.name == child_name), None - ) - - if child is None: - self.log.error(f"Child {child_name} not found") - return - - command_data_str = str(command_data).replace("\n", " ") - self.log.debug( - f"Propagating {command_name} to child {child.name}, command data: {command_data_str}, token: {token}" - ) - - try: - response = child.propagate_command(command_name, command_data, None) - with response_lock: - response_children.append(response) - - if response.flag in [ - ResponseFlag.EXECUTED_SUCCESSFULLY, - ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - ]: - self.log.debug( - f"Propagated {command_name} to children ({child.name}) successfully" - ) - else: - self.log.error( - f"Propagating {command_name} to children ({child.name}) failed: {ResponseFlag.Name(response.flag)}. See its logs for more information and stacktrace." - ) - - except Exception as e: # Catch all, we are in a thread and want to do something sensible when an exception is thrown - self.log.error( - f"Something wrong happened while sending the command to {child.name}: Error raised: {e!s}" - ) - self.log.exception(e) - flag = ( - ResponseFlag.DRUNC_EXCEPTION_THROWN - if isinstance(e, DruncException) - else ResponseFlag.UNHANDLED_EXCEPTION_THROWN - ) - - with response_lock: - stack = traceback.format_exc().split("\n") - response_children.append( - Response( - name=child.name, - token=token, - data=pack_to_any(Stacktrace(text=stack)), - flag=flag, - children=[], - ) - ) - - self.log.error( - f"Failed to propagate {command_name} to {child.name} ({child.name}) EXCEPTION THROWN: {str(e)}" - ) - - threads = [] - - for child, data in addressed_commands.items(): - self.log.debug(f"Propagating to {child}") - t = threading.Thread( - target=propagate_to_child, - kwargs={ - "child_name": child, - "command_name": command_name, - "command_data": data, - "token": token, - "response_lock": response_lock, - "response_children": response_children, - }, - ) - t.start() - threads.append(t) - - for thread in threads: - thread.join() - - return response_children - def parse_target_string(self, target: str) -> str: """Parse and check a target string. @@ -1348,76 +1069,113 @@ def recompute_status( ############# Actor commands ############# ########################################## - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @OLD_unpack_addressed_command_to() # 3rd step + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) @publish_command_time def take_control( self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> Response: - resp = "" - if execute_on_self: - if self.actor.take_control(token) != 0: - resp += f"Could not take control on {self.name}" - else: - resp += f"{token.user_name} took control on {self.name}" + request: TakeControlRequest, + context: ServicerContext, + ) -> TakeControlResponse: + response = TakeControlResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - response_children = self.OLD_propagate_to_children( - "take_control", - addressed_commands, - token, + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + text = "" + + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + include_excluded_nodes=True, ) + child_responses = self.propagate_concurrently( + lambda child, target: child.take_control( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) + + # This node. + if request.target == self.name or request.execute_along_path: + if self.actor.take_control(request.token) == 0: + text += f"took control on {self.name}" + else: + text += f"Could not take control on {self.name}" + if any( cr.flag not in [ ResponseFlag.EXECUTED_SUCCESSFULLY, ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ] - for cr in response_children + for cr in child_responses ): - resp += ", could not take control for all children" + text += ", could not take control of all children" - return Response( - name=self.name, - token=token, - data=pack_to_any(PlainText(text=resp)) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) + if text: + response.text = text - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @in_control # 3rd step - @OLD_unpack_addressed_command_to() # 4th step + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) + @in_control @publish_command_time def surrender_control( self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> Response: - resp = "" - if execute_on_self: - user = self.actor.get_user_name() - if self.actor.surrender_control(token) != 0: - resp += f"Could not surrender control on {self.name}" - else: - resp += f"{user} surrendered control on {self.name}" + request: SurrenderControlRequest, + context: ServicerContext, + ) -> SurrenderControlResponse: + response = SurrenderControlResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response - response_children = self.OLD_propagate_to_children( - "surrender_control", - addressed_commands, - token, + text = "" + + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + include_excluded_nodes=True, ) + child_responses = self.propagate_concurrently( + lambda child, target: child.surrender_control( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) + + # This node. + if request.target == self.name or request.execute_along_path: + if self.actor.surrender_control(request.token) == 0: + text += f"surrendered control on {self.name}" + else: + text += f"Could not surrender control on {self.name}" if any( cr.flag @@ -1425,94 +1183,105 @@ def surrender_control( ResponseFlag.EXECUTED_SUCCESSFULLY, ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, ] - for cr in response_children + for cr in child_responses ): - resp += ", could not surrender control for all children" + text += ", could not surrender control of all children" - return Response( - name=self.name, - token=token, - data=pack_to_any(PlainText(text=resp)) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) + if text: + response.text = text - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.READ, system=SystemType.CONTROLLER - ) # 2nd step - @OLD_unpack_addressed_command_to() # 3rd step + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.READ, system=SystemType.CONTROLLER) @publish_command_time def who_is_in_charge( self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> Response: - if execute_on_self: - user = pack_to_any(PlainText(text=self.actor.get_user_name())) - else: - user = None - - response_children = self.OLD_propagate_to_children( - "who_is_in_charge", - addressed_commands, - token, - ) - - return Response( + request: WhoIsInChargeRequest, + context: ServicerContext, + ) -> WhoIsInChargeResponse: + response = WhoIsInChargeResponse( + token=None, name=self.name, - token=token, - data=user, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + include_excluded_nodes=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.who_is_in_charge( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) + + # This node. + if request.target == self.name or request.execute_along_path: + response.text = self.actor.get_user_name() + + return response + ########################################## ####### Integration test commands ######## ########################################## - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) @in_control - @OLD_unpack_addressed_command_to() # 3rd step @publish_command_time def to_error( self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> PlainText: + request: ToErrorRequest, + context: ServicerContext, + ) -> ToErrorResponse: """ Transitions the stateful node to an error state. Used for testing purposes. """ + response = ToErrorResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + try: - if execute_on_self: - self.stateful_node.to_error() + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response - response_children = self.OLD_propagate_to_children( - "to_error", - addressed_commands, - token, - ) + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + include_excluded_nodes=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.to_error( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) - return Response( - name=self.name, - token=token, - data=None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) - except Exception as e: - self.log.exception(e) - return Response( - name=self.name, - token=token, - data=None, - flag=ResponseFlag.DRUNC_EXCEPTION_THROWN, - children=None, - ) + # This node. + if request.target == self.name or request.execute_along_path: + self.stateful_node.to_error() + + return response diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index b5472ca25..ee7993b1b 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -1,10 +1,8 @@ import ipaddress import socket -from functools import wraps import grpc from druncschema.controller_pb2 import ( - AddressedCommand, DescribeFSMRequest, DescribeFSMResponse, DescribeRequest, @@ -22,11 +20,18 @@ RecomputeStatusResponse, StatusRequest, StatusResponse, + SurrenderControlRequest, + SurrenderControlResponse, + TakeControlRequest, + TakeControlResponse, + ToErrorRequest, + ToErrorResponse, + WhoIsInChargeRequest, + WhoIsInChargeResponse, ) from druncschema.controller_pb2_grpc import ControllerStub -from druncschema.description_pb2 import Description from druncschema.generic_pb2 import PlainText, Stacktrace -from druncschema.request_response_pb2 import Request, ResponseFlag +from druncschema.request_response_pb2 import ResponseFlag from druncschema.token_pb2 import Token from drunc.exceptions import DruncServerSideError @@ -93,29 +98,6 @@ def __init__(self, address: str, token: Token): self.token = Token() self.token.CopyFrom(token) - def OLD_pack_empty_addressed_command(cmd): - @wraps(cmd) - def wrapper( - self, - target: str = "", - execute_along_path: bool = True, - execute_on_all_subsequent_children_in_path: bool = True, - **kwargs, - ): - command_name = cmd.__name__ - return cmd( - self, - addressed_command=AddressedCommand( - command_name=command_name, - target=target, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ), - **kwargs, - ) - - return wrapper - def status( self, target: str = "", @@ -290,46 +272,89 @@ def recompute_status( return response - @OLD_pack_empty_addressed_command def take_control( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "take_control", data=addressed_command, outformat=PlainText, timeout=timeout + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> TakeControlResponse: + request = TakeControlRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, ) + request.token.CopyFrom(self.token) - @OLD_pack_empty_addressed_command - def who_is_in_charge( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "who_is_in_charge", - data=addressed_command, - outformat=PlainText, - timeout=timeout, - ) + try: + response = self.stub.take_control(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response - @OLD_pack_empty_addressed_command def surrender_control( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "surrender_control", - data=addressed_command, - outformat=PlainText, - timeout=timeout, + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> SurrenderControlResponse: + request = SurrenderControlRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.surrender_control(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response + + def who_is_in_charge( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> WhoIsInChargeResponse: + request = WhoIsInChargeRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.who_is_in_charge(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response - @OLD_pack_empty_addressed_command def to_error( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "to_error", - data=addressed_command, - outformat=Description, - timeout=timeout, + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> ToErrorResponse: + request = ToErrorRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.to_error(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response def handle_response(self, response, command, outformat): dr = DecodedResponse( @@ -392,23 +417,3 @@ def text(verb="not executed", reason=""): self.log.error(f"Exception thrown from child: {e}") return dr - - def OLD_send_command( - self, - command: str, - data=None, - outformat=None, - timeout: int | float = 60, - ): - request = Request() - request.token.CopyFrom(self.token) - if data is not None: - request.data.Pack(data) - - try: - cmd = getattr(self.stub, command) - response = cmd(request, timeout=timeout) - except grpc.RpcError as e: - handle_grpc_error(e) - - return self.handle_response(response, command, outformat) diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index 821f5a010..c321de198 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -210,7 +210,7 @@ def take_control( target=target, execute_along_path=execute_along_path, execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ).data + ) @click.command("surrender-control") @@ -240,7 +240,7 @@ def surrender_control( target=target, execute_along_path=execute_along_path, execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ).data + ) @click.command("who-am-i") @@ -280,11 +280,12 @@ def who_is_in_charge( execute_along_path=execute_along_path, execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, ) - .data + .text ) - if who: - log = get_logger(**logger_params) - log.info(who.text) ## TODO create a table of who is in charge + + # TODO: create a table of who is in charge + log = get_logger(**logger_params) + log.info(who) @click.command("include") diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 399d4687f..1b13e038a 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -206,7 +206,7 @@ def controller_cleanup(): who = "" try: - who = ctx.get_driver("controller").who_is_in_charge().data + who = ctx.get_driver("controller").who_is_in_charge().text except grpc.RpcError as e: dead = grpc.StatusCode.UNAVAILABLE == e.code() except Exception as e: