Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/drunc/controller/interface/shell_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def add_to_table(table, response, prefix=""):
else "[red]NA[/]"
),
)
for child_response in response.children:
for child_response in sorted(response.children, key=lambda c: c.name):
add_to_table(table, child_response, " " + prefix)

add_to_table(t, result)
Expand Down
10 changes: 7 additions & 3 deletions src/drunc/process_manager/interface/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,12 @@ def logs(
)

result = obj.get_driver("process_manager").logs(log_req)
if result is None:
return

if result.uuid.uuid is not None:
obj.rule(f"[yellow]{result.uuid.uuid}[/yellow] logs")
display_name = result.name or result.uuid.uuid or ""
if result.name is not None:
obj.rule(f"[yellow]{display_name}[/yellow] logs")

for line in result.lines:
if not line.strip(): # keep empty lines for visual clarity
Expand All @@ -234,7 +237,8 @@ def logs(
line = line.replace(grep, f"[u]{grep}[/]")

obj.print(line)
obj.rule("End")
if result.name is not None:
obj.rule(f"[yellow]{display_name}[/yellow] end")


@click.command("restart")
Expand Down
18 changes: 12 additions & 6 deletions src/drunc/process_manager/k8s_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,21 +626,24 @@ def _get_pod_volumes_and_mounts(
# Check if this path is already covered by the JSON volumes above
is_covered = False
for vm in container_volume_mounts:
if vm.mount_path == target_home_path or target_home_path.startswith(vm.mount_path + "/"):
self.log.debug(f"Home path '{target_home_path}' is already covered by mount '{vm.mount_path}'")
if vm.mount_path == target_home_path or target_home_path.startswith(
vm.mount_path + "/"
):
self.log.debug(
f"Home path '{target_home_path}' is already covered by mount '{vm.mount_path}'"
)
is_covered = True
break

if not is_covered:
self.log.info(f"Auto-mounting home directory: '{target_home_path}'")
vol_name = f"home-{username}"

pod_volumes.append(
client.V1Volume(
name=vol_name,
host_path=client.V1HostPathVolumeSource(
path=target_home_path,
type="Directory"
path=target_home_path, type="Directory"
),
)
)
Expand Down Expand Up @@ -1001,6 +1004,7 @@ def _get_host_username(self) -> str:
except KeyError:
try:
import pwd

return pwd.getpwuid(os.getuid()).pw_name
except KeyError:
return str(os.getuid())
Expand Down Expand Up @@ -1343,7 +1347,9 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines:
logs = self._core_v1_api.read_namespaced_pod_log(
podname, session, tail_lines=log_request.how_far or 100
)
return LogLines(uuid=ProcessUUID(uuid=uuid), lines=logs.split("\n"))
return LogLines(
name=podname, uuid=ProcessUUID(uuid=uuid), lines=logs.split("\n")
)
except self._api_error_v1_api as e:
return LogLines(
uuid=ProcessUUID(uuid=uuid),
Expand Down
8 changes: 8 additions & 0 deletions src/drunc/process_manager/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,14 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines:
lines=[],
flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED,
)
except BadQuery as e:
return LogLines(
name=self.name,
token=None,
uuid=None,
lines=[str(e)],
flag=ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT,
)

return response

Expand Down
24 changes: 19 additions & 5 deletions src/drunc/process_manager/process_manager_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
ProcessRestriction,
)
from druncschema.process_manager_pb2_grpc import ProcessManagerStub
from druncschema.request_response_pb2 import Request
from druncschema.request_response_pb2 import Request, ResponseFlag
from druncschema.token_pb2 import Token

from drunc.connectivity_service.client import ConnectivityServiceClient
Expand Down Expand Up @@ -723,11 +723,27 @@ def kill(

return response

def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines:
def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines | None:
request.token.CopyFrom(self.token)

try:
response = self.stub.logs(request, timeout=timeout)

# Check if the response indicates a BadQuery error
if response.flag == ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT:
lines = response.lines
if len(lines) == 1:
lines = lines[0]
self.log.warning(f"Bad query for logs: {lines}")
return None

# Check for other error flags
if response.flag == ResponseFlag.DRUNC_EXCEPTION_THROWN:
self.log.error(f"Exception occurred on server: {response.lines}")
return None

return response

except grpc.RpcError as e:
try:
error_details = extract_grpc_rich_error(e)
Expand All @@ -737,10 +753,8 @@ def logs(self, request: LogRequest, timeout: int | float = 60) -> LogLines:
f"Could not extract rich error details from gRPC error: {extraction_error}",
exc_info=True,
)

handle_grpc_error(e)

return response
return None

def ps(
self, request: ProcessQuery, timeout: int | float = 60
Expand Down
3 changes: 2 additions & 1 deletion src/drunc/process_manager/ssh_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines:
logfile = self.boot_request[uid].process_description.process_logs_path
user = self.boot_request[uid].process_description.metadata.user
host = self.boot_request[uid].process_description.metadata.hostname
process_name = self.boot_request[uid].process_description.metadata.name

# Determine number of lines to retrieve (default: 100)
nlines = log_request.how_far if log_request.how_far else 100
Expand All @@ -241,7 +242,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines:
)

return LogLines(
name=self.name,
name=process_name,
token=None,
uuid=ProcessUUID(uuid=uid),
lines=lines,
Expand Down
69 changes: 59 additions & 10 deletions src/drunc/process_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import os
import re
from functools import update_wrapper
from operator import attrgetter

import click
from druncschema.process_manager_pb2 import ProcessInstance, ProcessQuery, ProcessUUID
from druncschema.process_manager_pb2 import (
ProcessInstance,
ProcessInstanceList,
ProcessQuery,
ProcessUUID,
)
from rich.table import Table

from drunc.exceptions import DruncCommandException, DruncException, DruncSetupException
Expand Down Expand Up @@ -63,7 +67,58 @@ def make_tree(values):
return lines


def tabulate_process_instance_list(pil, title, long=False):
def order_process_by_name(processes: list[ProcessInstance]):
"""Given a list of processes, perform a tiered order by the name"""
by_session = {}
for process in processes:
m = process.process_description.metadata
by_session.setdefault(m.session, []).append(process)

ordered = []
for session in sorted(by_session.keys()):
session_processes = by_session[session]
node_by_id = {}
children = {}
roots = []

for process in session_processes:
tree_id = process.process_description.metadata.tree_id or ""
node_by_id.setdefault(tree_id, []).append(process)

for tree_id, processes in node_by_id.items():
node_by_id[tree_id] = sorted(
processes,
key=lambda p: (
p.process_description.metadata.name,
p.uuid.uuid,
),
)

for tree_id in node_by_id.keys():
parent_id = tree_id.rsplit(".", 1)[0] if "." in tree_id else None
if not parent_id or parent_id not in node_by_id:
roots.append(tree_id)
else:
children.setdefault(parent_id, []).append(tree_id)

def sort_key(tree_id):
m = node_by_id[tree_id][0].process_description.metadata
return (m.name, tree_id)

def walk(tree_id):
ordered.extend(node_by_id[tree_id])
for child_id in sorted(children.get(tree_id, []), key=sort_key):
walk(child_id)

for root_id in sorted(roots, key=sort_key):
walk(root_id)

return ordered


def tabulate_process_instance_list(
pil: ProcessInstanceList, title: str, long: bool = False
):
t = Table(title=title)
t.add_column("session")
t.add_column("friendly name")
Expand All @@ -75,13 +130,7 @@ def tabulate_process_instance_list(pil, title, long=False):
if long:
t.add_column("executable")

sorted_pil = sorted(
pil.values,
key=attrgetter(
"process_description.metadata.session",
"process_description.metadata.tree_id",
),
)
sorted_pil = order_process_by_name(pil.values)
tree_str = make_tree(sorted_pil)
try:
for process, line in zip(sorted_pil, tree_str):
Expand Down