Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 40 additions & 18 deletions src/drunc/controller/children_interface/child_node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod

from druncschema.controller_pb2 import (
AddressedCommand,
DescribeFSMResponse,
DescribeResponse,
ExcludeResponse,
Expand All @@ -11,22 +10,18 @@
IncludeResponse,
RecomputeStatusResponse,
StatusResponse,
SurrenderControlResponse,
TakeControlResponse,
ToErrorResponse,
WhoIsInChargeResponse,
)
from druncschema.request_response_pb2 import Response
from druncschema.token_pb2 import Token

from drunc.exceptions import DruncSetupException
from drunc.utils.utils import (
ControlType,
get_logger,
)


class ChildInterfaceTechnologyUnknown(DruncSetupException):
def __init__(self, t, name):
super().__init__(f"The type {t} is not supported for the ChildNode {name}")


class ChildNode(ABC):
def __init__(self, name: str, node_type: ControlType):
self.log = get_logger(f"controller.{name}-child-node")
Expand All @@ -46,15 +41,6 @@ def get_endpoint(self) -> str:
def terminate(self) -> None:
pass

@abstractmethod
def propagate_command(
self,
command: str,
request: AddressedCommand,
token: Token | None,
) -> Response:
pass

@abstractmethod
def status(
self,
Expand Down Expand Up @@ -129,3 +115,39 @@ def recompute_status(
execute_on_all_subsequent_children_in_path: bool = True,
) -> RecomputeStatusResponse:
pass

@abstractmethod
def take_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> TakeControlResponse:
pass

@abstractmethod
def surrender_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> SurrenderControlResponse:
pass

@abstractmethod
def who_is_in_charge(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> WhoIsInChargeResponse:
pass

@abstractmethod
def to_error(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> ToErrorResponse:
pass
148 changes: 121 additions & 27 deletions src/drunc/controller/children_interface/grpc_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import grpc
from druncschema.controller_pb2 import (
AddressedCommand,
DescribeFSMRequest,
DescribeFSMResponse,
DescribeRequest,
Expand All @@ -22,10 +21,17 @@
RecomputeStatusResponse,
StatusRequest,
StatusResponse,
SurrenderControlRequest,
SurrenderControlResponse,
TakeControlRequest,
TakeControlResponse,
ToErrorRequest,
ToErrorResponse,
WhoIsInChargeRequest,
WhoIsInChargeResponse,
)
from druncschema.controller_pb2_grpc import ControllerStub
from druncschema.generic_pb2 import PlainText, Stacktrace
from druncschema.request_response_pb2 import Request, Response
from druncschema.token_pb2 import Token
from grpc_status import rpc_status

Expand Down Expand Up @@ -191,30 +197,6 @@ def start_listening(self, bdesc):
)
)

def propagate_command(
self,
command: str,
request: AddressedCommand,
token: Token | None,
) -> Response:
packed_request = Request(token=token)
packed_request.data.Pack(request)

cmd = getattr(self.stub, command)

try:
response = cmd(packed_request)
except grpc.RpcError as error:
try:
self.handle_child_grpc_error(error)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed, attempting to reconnect..."
)
response = self._attempt_reconnection(lambda: cmd(packed_request))

return response

def status(
self,
target: str = "",
Expand Down Expand Up @@ -437,14 +419,126 @@ def recompute_status(
self.handle_child_grpc_error(e)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed during recompute_status check, attempting to reconnect..."
f"Connection to {self.name} at {self.uri} failed during recompute_status, attempting to reconnect..."
)
response = self._attempt_reconnection(
lambda: self.stub.recompute_status(request)
)

return response

def take_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> TakeControlResponse:
request = TakeControlRequest(
token=None,
target=target,
execute_along_path=execute_along_path,
execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
)

try:
response = self.stub.take_control(request)
except grpc.RpcError as e:
try:
self.handle_child_grpc_error(e)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed during take_control, attempting to reconnect..."
)
response = self._attempt_reconnection(
lambda: self.stub.take_control(request)
)

return response

def surrender_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> SurrenderControlResponse:
request = SurrenderControlRequest(
token=None,
target=target,
execute_along_path=execute_along_path,
execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
)

try:
response = self.stub.surrender_control(request)
except grpc.RpcError as e:
try:
self.handle_child_grpc_error(e)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed during surrender_control, attempting to reconnect..."
)
response = self._attempt_reconnection(
lambda: self.stub.surrender_control(request)
)

return response

def who_is_in_charge(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> WhoIsInChargeResponse:
request = WhoIsInChargeRequest(
token=None,
target=target,
execute_along_path=execute_along_path,
execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
)

try:
response = self.stub.who_is_in_charge(request)
except grpc.RpcError as e:
try:
self.handle_child_grpc_error(e)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed during who_is_in_charge, attempting to reconnect..."
)
response = self._attempt_reconnection(
lambda: self.stub.who_is_in_charge(request)
)

return response

def to_error(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> ToErrorResponse:
request = ToErrorRequest(
token=None,
target=target,
execute_along_path=execute_along_path,
execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
)

try:
response = self.stub.to_error(request)
except grpc.RpcError as e:
try:
self.handle_child_grpc_error(e)
except ServerUnreachable:
self.log.info(
f"Connection to {self.name} at {self.uri} failed during to_error, attempting to reconnect..."
)
response = self._attempt_reconnection(
lambda: self.stub.to_error(request)
)

return response

def handle_child_grpc_error(self, error: grpc.RpcError) -> NoReturn:
"""Handle gRPC errors from sending commands to the child controller.

Expand Down
76 changes: 54 additions & 22 deletions src/drunc/controller/children_interface/rest_api_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import requests
import socks
from druncschema.controller_pb2 import (
AddressedCommand,
DescribeFSMResponse,
DescribeResponse,
ExcludeResponse,
Expand All @@ -22,10 +21,13 @@
RecomputeStatusResponse,
Status,
StatusResponse,
SurrenderControlResponse,
TakeControlResponse,
ToErrorResponse,
WhoIsInChargeResponse,
)
from druncschema.description_pb2 import Description
from druncschema.request_response_pb2 import Response, ResponseFlag
from druncschema.token_pb2 import Token
from druncschema.request_response_pb2 import ResponseFlag
from flask import Flask, request
from flask_restful import Api

Expand Down Expand Up @@ -65,15 +67,8 @@ def run(self) -> None:
self.log.debug("ResponseDispatcher starting to run")

while True:
# self.log.debug(f'starting to iterating: {self.listener.queue.qsize()}')
# self.log.debug(f'Queue pointer {self.listener.queue}')
# try:
r = self.listener.queue.get()
self.log.debug(f"ResponseDispatcher got the following answer: {r}")
# except:
# self.log.debug(f'ResponseDispatcher nothing')
# continue

if r == self.STOP:
self.log.debug("ResponseDispatcher STOP")
break
Expand Down Expand Up @@ -414,18 +409,6 @@ def get_endpoint(self) -> str:
def terminate(self) -> None:
pass

def propagate_command(
self,
command: str,
request: AddressedCommand,
token: Token | None,
) -> Response:
self.log.info(f"Ignoring command '{command}' sent to '{self.name}'")
return Response(
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)

def status(
self,
target: str = "",
Expand Down Expand Up @@ -700,3 +683,52 @@ def recompute_status(
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)

def take_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> TakeControlResponse:
return TakeControlResponse(
token=None,
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)

def surrender_control(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> SurrenderControlResponse:
return SurrenderControlResponse(
token=None,
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)

def who_is_in_charge(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> WhoIsInChargeResponse:
return WhoIsInChargeResponse(
token=None,
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)

def to_error(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> ToErrorResponse:
self.state.to_error()
return ToErrorResponse(
token=None,
name=self.name,
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)
Loading
Loading