Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6a1c6fe
Update remaining sessman types.
jamesturner246 Jul 24, 2025
f44616a
Procman describe uses new Description msg.
jamesturner246 Jul 24, 2025
24e9009
New terminate and kill messages.
jamesturner246 Jul 24, 2025
b01d1d7
New restart messages.
jamesturner246 Jul 24, 2025
8fe3d7b
Most PM commands' messages except boot and logs.
jamesturner246 Jul 25, 2025
5c37ab4
Whack-a-mole with regressions.
jamesturner246 Jul 25, 2025
119f918
A couple more...
jamesturner246 Jul 25, 2025
11f6bd7
Same treatment for logs message.
jamesturner246 Jul 25, 2025
4cfff7d
logs command fix.
jamesturner246 Jul 25, 2025
fbf85ea
Typing.
jamesturner246 Jul 25, 2025
4e66288
Use new boot message.
jamesturner246 Jul 25, 2025
191c6d0
More whack-a-mole.
jamesturner246 Jul 25, 2025
4a9958c
Simplify SM client.
jamesturner246 Jul 28, 2025
76f2f63
Add timeout to SM calls.
jamesturner246 Jul 28, 2025
24efa7b
Simplify stub creation -- they aren't covariant anyway.
jamesturner246 Jul 28, 2025
0fe8d3d
Use Request for argless commands, until we know what auth we are doing.
jamesturner246 Jul 28, 2025
ad32b03
More PM request messages.
jamesturner246 Jul 28, 2025
d1c0249
Only boot to go.
jamesturner246 Jul 28, 2025
bb62053
Boot request message.
jamesturner246 Jul 28, 2025
ed2b340
Simplify handle_response.
jamesturner246 Jul 29, 2025
832a529
Make handle_grpc_error a global fn for now.
jamesturner246 Jul 29, 2025
8677c42
Fix the temp hack.
jamesturner246 Jul 29, 2025
8cb8040
More PM refactoring.
jamesturner246 Jul 30, 2025
6c01427
One last PM refactor for now.
jamesturner246 Jul 30, 2025
b304a44
Few last typos.
jamesturner246 Jul 31, 2025
1863a0a
fix for the PM publish function
wanyunSu Jul 31, 2025
b859537
Another sneaky one...
jamesturner246 Jul 31, 2025
85ff803
Residual _logs from ruff checking
Aug 1, 2025
098b136
Merge branch 'develop' into jamesturner246/simpler_grpc_pm
PawelPlesniak Aug 5, 2025
1d6ce1b
Fix controller driver being deleted, and check for clobebring.
jamesturner246 Aug 5, 2025
1a8ba3d
Merge branch 'develop' into jamesturner246/simpler_grpc_pm
PawelPlesniak Aug 5, 2025
dd05586
create_stub no longer exists.
jamesturner246 Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/drunc/controller/controller_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

class ControllerDriver(GRPCDriver):
def __init__(self, address: str, token, **kwargs):
super(ControllerDriver, self).__init__(
super().__init__(
name="controller_driver", address=address, token=token, **kwargs
)

def create_stub(self, channel):
return ControllerStub(channel)
self.stub = ControllerStub(self.channel)

def pack_empty_addressed_command(cmd):
@wraps(cmd)
Expand Down
1 change: 1 addition & 0 deletions src/drunc/fsm/actions/usvc_elisa_logbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self):
)
raise DotDruncJsonIncorrectFormat(err_msg) from KeyError
else:

err_msg: str = ""
if default_elisa_logbook:
err_msg = (
Expand Down
35 changes: 14 additions & 21 deletions src/drunc/process_manager/interface/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def boot(
log = get_logger("process_manager.shell")
processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user))

if len(processes.data.values) > 0:
if len(processes.values) > 0:
click.confirm(
f"You already have {len(processes.data.values)} processes running, are you sure you want to boot a session?",
f"You already have {len(processes.values)} processes running, are you sure you want to boot a session?",
abort=True,
)

Expand All @@ -67,7 +67,7 @@ def boot(
if not result:
break
log.debug(
f"'{result.data.process_description.metadata.name}' ({result.data.uuid.uuid}) process started"
f"'{result.values[0].process_description.metadata.name}' ({result.values[0].uuid.uuid}) process started"
)
except InterruptedCommand:
return
Expand Down Expand Up @@ -144,7 +144,7 @@ def dummy_boot(
if not result:
break
log.debug(
f"'{result.data.process_description.metadata.name}' ({result.data.uuid.uuid}) process started"
f"'{result.values[0].process_description.metadata.name}' ({result.values[0].uuid.uuid}) process started"
)
except InterruptedCommand:
return
Expand All @@ -159,39 +159,35 @@ def terminate(obj: ProcessManagerContext) -> None:
if not result:
return
obj.print(
tabulate_process_instance_list(result.data, "Terminated process", False)
tabulate_process_instance_list(result, "Terminated process", False)
) # rich tables require console printing

obj.delete_driver("controller")


@click.command("kill")
@add_query_options(at_least_one=True)
@click.pass_obj
def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Killing with query {query}")
result = obj.get_driver("process_manager").kill(query=query)
result = obj.get_driver("process_manager").kill(query)
if not result:
return
obj.print(
tabulate_process_instance_list(result.data, "Killed process", False)
tabulate_process_instance_list(result, "Killed process", False)
) # rich tables require console printing

obj.delete_driver("controller")


@click.command("flush")
@add_query_options(at_least_one=False, all_processes_by_default=True)
@click.pass_obj
def flush(obj: ProcessManagerContext, query: ProcessQuery) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Flushing with query {query}")
result = obj.get_driver("process_manager").flush(query=query)
result = obj.get_driver("process_manager").flush(query)
if not result:
return
obj.print(
tabulate_process_instance_list(result.data, "Flushed process", False)
tabulate_process_instance_list(result, "Flushed process", False)
) # rich tables require console printing


Expand All @@ -216,13 +212,12 @@ def logs(
query=query,
)

result = obj.get_driver("process_manager").logs(log_req).data
result = obj.get_driver("process_manager").logs(log_req)

if result.uuid.uuid is not None:
obj.rule(f"[yellow]{result.uuid.uuid}[/yellow] logs")

for line in result.lines:

if line == "":
obj.print("")
continue
Expand All @@ -248,7 +243,7 @@ def logs(
def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Restarting with query {query}")
obj.get_driver("process_manager").restart(query=query)
obj.get_driver("process_manager").restart(query)


@click.command("ps")
Expand All @@ -262,16 +257,14 @@ def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None:
help="Whether to have a long output",
)
@click.pass_obj
def ps(
obj: ProcessManagerContext, query: ProcessQuery, long_format: bool
) -> None:
def ps(obj: ProcessManagerContext, query: ProcessQuery, long_format: bool) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Running ps with query {query}")
results = obj.get_driver("process_manager").ps(query=query)
results = obj.get_driver("process_manager").ps(query)
if not results:
return
obj.print(
tabulate_process_instance_list(
results.data, title="Processes running", long=long_format
results, title="Processes running", long=long_format
)
)
9 changes: 4 additions & 5 deletions src/drunc/process_manager/interface/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) ->
ctx.obj.reset(address=process_manager_address)

try:

desc = ctx.obj.get_driver("process_manager").describe()
except ServerUnreachable as e:
process_manager_shell_log = get_logger(
Expand All @@ -63,7 +62,7 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) ->

process_manager_log = get_logger(
logger_name="process_manager",
log_file_path=desc.data.info,
log_file_path=desc.info,
override_log_file=False,
rich_handler=True,
)
Expand All @@ -72,10 +71,10 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) ->
f"[green]{getpass.getuser()}[/green] connected to the process manager through a [green]drunc-process-manager-shell[/green] via address [green]{process_manager_address}[/green]"
)
process_manager_shell_log.info(
f"Connected to {process_manager_address}, running '{desc.data.name}.{desc.data.session}' (name.session), starting listening..."
f"Connected to {process_manager_address}, running '{desc.name}.{desc.session}' (name.session), starting listening..."
)
if desc.data.HasField("broadcast"):
ctx.obj.start_listening(desc.data.broadcast)
if desc.HasField("broadcast"):
ctx.obj.start_listening(desc.broadcast)

def cleanup():
ctx.obj.terminate()
Expand Down
51 changes: 32 additions & 19 deletions src/drunc/process_manager/k8s_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ProcessRestriction,
ProcessUUID,
)
from druncschema.request_response_pb2 import ResponseFlag
from kubernetes import client, config

from drunc.exceptions import DruncCommandException, DruncException
Expand Down Expand Up @@ -368,20 +369,29 @@ def _return_code(self, podname, session):
def _terminate(self):
self.log.info("Terminating")


def _logs_impl(self, log_request: LogRequest) -> LogLines:
uuids = self._get_process_uid(log_request.query, in_boot_request=True)
uuid = self._ensure_one_process(uuids, in_boot_request=True)
for uuid in self._get_process_uid(log_request.query):
podname = self.boot_request[uuid].process_description.metadata.name
session = self.boot_request[uuid].process_description.metadata.session
return [LogLines(line=log) for log in self._core_v1_api.read_namespaced_pod_log(
podname, session, tail_lines=log_request.how_far
).split("\n")]

def _boot_impl(self, boot_request: BootRequest) -> ProcessUUID:
return [
LogLines(line=log)
for log in self._core_v1_api.read_namespaced_pod_log(
podname, session, tail_lines=log_request.how_far
).split("\n")
]

def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList:
self.log.debug(f"{self.name} running boot command")
this_uuid = str(uuid.uuid4())
return self.__boot(boot_request, this_uuid)
process = self.__boot(boot_request, this_uuid)
return ProcessInstanceList(
name=self.name,
token=None,
values=[process],
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)

def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
session = boot_request.process_description.metadata.session
Expand Down Expand Up @@ -422,12 +432,15 @@ def _ps_impl(

ret.append(self._get_pi(proc_uuid, podname, session, return_code))

pil = ProcessInstanceList(values=ret)

return pil
return ProcessInstanceList(
name=self.name,
token=None,
values=ret,
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)

def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList:
# ret = []
ret = []
uuids = self._get_process_uid(query, in_boot_request=True)
uuid = self._ensure_one_process(uuids, in_boot_request=True)
for uuid in self._get_process_uid(query):
Expand All @@ -437,25 +450,25 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList:

self._kill_pod(podname, session)

same_uuid_br = []
same_uuid_br = BootRequest()
same_uuid_br.CopyFrom(self.boot_request[uuid])
same_uuid = uuid

del self.boot_request[uuid]
del uuid

ret = self.__boot(same_uuid_br, same_uuid)
ret = [self.__boot(same_uuid_br, same_uuid)]
# ret.append(self._get_pi(uuid, podname, session))

del same_uuid_br
del same_uuid

# ret.append(self._get_pi(uuid, podname, session))

# pil = ProcessInstanceList(
# values=ret
# )
return ret
return ProcessInstanceList(
name=self.name,
token=None,
values=ret,
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)

# # ORDER MATTERS!
# @broadcasted # outer most wrapper 1st step
Expand Down
Loading
Loading