Skip to content
9 changes: 6 additions & 3 deletions packages/jumpstarter-cli/jumpstarter_cli/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def get():
@opt_output_all
@opt_comma_separated(
"with",
{"leases", "online"},
help_text="Include fields: leases, online (comma-separated or repeated)"
{"leases", "online", "status"},
help_text="Include fields: leases, online, status (comma-separated or repeated)",
)
@handle_exceptions_with_reauthentication(relogin_client)
def get_exporters(config, selector: str | None, output: OutputType, with_options: list[str]):
Expand All @@ -32,7 +32,10 @@ def get_exporters(config, selector: str | None, output: OutputType, with_options

include_leases = "leases" in with_options
include_online = "online" in with_options
exporters = config.list_exporters(filter=selector, include_leases=include_leases, include_online=include_online)
include_status = "status" in with_options
exporters = config.list_exporters(
filter=selector, include_leases=include_leases, include_online=include_online, include_status=include_status
)

model_print(exporters, output)

Expand Down

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def __init__(self, channel):
request_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.UnregisterRequest.SerializeToString,
response_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.UnregisterResponse.FromString,
_registered_method=True)
self.ReportStatus = channel.unary_unary(
'/jumpstarter.v1.ControllerService/ReportStatus',
request_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusRequest.SerializeToString,
response_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusResponse.FromString,
_registered_method=True)
self.Listen = channel.unary_stream(
'/jumpstarter.v1.ControllerService/Listen',
request_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ListenRequest.SerializeToString,
Expand Down Expand Up @@ -89,6 +94,14 @@ def Unregister(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def ReportStatus(self, request, context):
"""Exporter status report
Allows exporters to report their own status to the controller
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Listen(self, request, context):
"""Exporter listening
Returns stream tokens for accepting incoming client connections
Expand Down Expand Up @@ -163,6 +176,11 @@ def add_ControllerServiceServicer_to_server(servicer, server):
request_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.UnregisterRequest.FromString,
response_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.UnregisterResponse.SerializeToString,
),
'ReportStatus': grpc.unary_unary_rpc_method_handler(
servicer.ReportStatus,
request_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusRequest.FromString,
response_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusResponse.SerializeToString,
),
'Listen': grpc.unary_stream_rpc_method_handler(
servicer.Listen,
request_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ListenRequest.FromString,
Expand Down Expand Up @@ -269,6 +287,33 @@ def Unregister(request,
metadata,
_registered_method=True)

@staticmethod
def ReportStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/jumpstarter.v1.ControllerService/ReportStatus',
jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusRequest.SerializeToString,
jumpstarter_dot_v1_dot_jumpstarter__pb2.ReportStatusResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def Listen(request,
target,
Expand Down Expand Up @@ -522,6 +567,11 @@ def __init__(self, channel):
request_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ResetRequest.SerializeToString,
response_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ResetResponse.FromString,
_registered_method=True)
self.GetStatus = channel.unary_unary(
'/jumpstarter.v1.ExporterService/GetStatus',
request_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusRequest.SerializeToString,
response_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusResponse.FromString,
_registered_method=True)


class ExporterServiceServicer(object):
Expand Down Expand Up @@ -560,6 +610,12 @@ def Reset(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetStatus(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_ExporterServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand Down Expand Up @@ -588,6 +644,11 @@ def add_ExporterServiceServicer_to_server(servicer, server):
request_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ResetRequest.FromString,
response_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.ResetResponse.SerializeToString,
),
'GetStatus': grpc.unary_unary_rpc_method_handler(
servicer.GetStatus,
request_deserializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusRequest.FromString,
response_serializer=jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'jumpstarter.v1.ExporterService', rpc_method_handlers)
Expand Down Expand Up @@ -735,3 +796,30 @@ def Reset(request,
timeout,
metadata,
_registered_method=True)

@staticmethod
def GetStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/jumpstarter.v1.ExporterService/GetStatus',
jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusRequest.SerializeToString,
jumpstarter_dot_v1_dot_jumpstarter__pb2.GetStatusResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
30 changes: 29 additions & 1 deletion packages/jumpstarter/jumpstarter/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from grpc.aio import AioRpcError
from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc, router_pb2_grpc

from jumpstarter.common import Metadata
from jumpstarter.common import ExporterStatus, Metadata
from jumpstarter.common.exceptions import JumpstarterException
from jumpstarter.common.resources import ResourceMetadata
from jumpstarter.common.serde import decode_value, encode_value
Expand Down Expand Up @@ -47,6 +47,12 @@ class DriverInvalidArgument(DriverError, ValueError):
"""


class ExporterNotReady(DriverError):
"""
Raised when the exporter is not ready to accept driver calls
"""


@dataclass(kw_only=True)
class AsyncDriverClient(
Metadata,
Expand Down Expand Up @@ -76,9 +82,28 @@ def __post_init__(self):
handler.setFormatter(logging.Formatter("%(name)s - %(levelname)s - %(message)s"))
self.logger.addHandler(handler)

async def check_exporter_status(self):
"""Check if the exporter is ready to accept driver calls"""
try:
response = await self.stub.GetStatus(jumpstarter_pb2.GetStatusRequest())
status = ExporterStatus.from_proto(response.status)

if status != ExporterStatus.LEASE_READY:
raise ExporterNotReady(f"Exporter status is {status}: {response.status_message}")

except AioRpcError as e:
# If GetStatus is not implemented, assume ready for backward compatibility
if e.code() == StatusCode.UNIMPLEMENTED:
self.logger.debug("GetStatus not implemented, assuming exporter is ready")
return
raise DriverError(f"Failed to check exporter status: {e.details()}") from e

async def call_async(self, method, *args):
"""Make DriverCall by method name and arguments"""

# Check exporter status before making the call
await self.check_exporter_status()

request = jumpstarter_pb2.DriverCallRequest(
uuid=str(self.uuid),
method=method,
Expand All @@ -105,6 +130,9 @@ async def call_async(self, method, *args):
async def streamingcall_async(self, method, *args):
"""Make StreamingDriverCall by method name and arguments"""

# Check exporter status before making the call
await self.check_exporter_status()

request = jumpstarter_pb2.StreamingDriverCallRequest(
uuid=str(self.uuid),
method=method,
Expand Down
39 changes: 26 additions & 13 deletions packages/jumpstarter/jumpstarter/client/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
from jumpstarter_protocol import client_pb2, client_pb2_grpc, jumpstarter_pb2_grpc, kubernetes_pb2, router_pb2_grpc
from pydantic import BaseModel, ConfigDict, Field, field_serializer

from jumpstarter.common import ExporterStatus
from jumpstarter.common.grpc import translate_grpc_exceptions


@dataclass
class WithOptions:
show_online: bool = False
show_leases: bool = False
show_status: bool = False


def add_display_columns(table, options: WithOptions = None):
Expand All @@ -28,6 +30,8 @@ def add_display_columns(table, options: WithOptions = None):
table.add_column("NAME")
if options.show_online:
table.add_column("ONLINE")
if options.show_status:
table.add_column("STATUS")
table.add_column("LABELS")
if options.show_leases:
table.add_column("LEASED BY")
Expand All @@ -42,6 +46,9 @@ def add_exporter_row(table, exporter, options: WithOptions = None, lease_info: t
row_data.append(exporter.name)
if options.show_online:
row_data.append("yes" if exporter.online else "no")
if options.show_status:
status_str = str(exporter.status) if exporter.status else "UNKNOWN"
row_data.append(status_str)
row_data.append(",".join(("{}={}".format(k, v) for k, v in sorted(exporter.labels.items()))))
if options.show_leases:
if lease_info:
Expand Down Expand Up @@ -81,12 +88,16 @@ class Exporter(BaseModel):
name: str
labels: dict[str, str]
online: bool = False
status: ExporterStatus | None = None
lease: Lease | None = None

@classmethod
def from_protobuf(cls, data: client_pb2.Exporter) -> Exporter:
namespace, name = parse_exporter_identifier(data.name)
return cls(namespace=namespace, name=name, labels=data.labels, online=data.online)
status = None
if hasattr(data, "status") and data.status:
status = ExporterStatus.from_proto(data.status)
return cls(namespace=namespace, name=name, labels=data.labels, online=data.online, status=status)

@classmethod
def rich_add_columns(cls, table, options: WithOptions = None):
Expand Down Expand Up @@ -197,6 +208,7 @@ class ExporterList(BaseModel):
next_page_token: str | None = Field(exclude=True)
include_online: bool = Field(default=False, exclude=True)
include_leases: bool = Field(default=False, exclude=True)
include_status: bool = Field(default=False, exclude=True)

@classmethod
def from_protobuf(cls, data: client_pb2.ListExportersResponse) -> ExporterList:
Expand All @@ -206,11 +218,15 @@ def from_protobuf(cls, data: client_pb2.ListExportersResponse) -> ExporterList:
)

def rich_add_columns(self, table):
options = WithOptions(show_online=self.include_online, show_leases=self.include_leases)
options = WithOptions(
show_online=self.include_online, show_leases=self.include_leases, show_status=self.include_status
)
Exporter.rich_add_columns(table, options)

def rich_add_rows(self, table):
options = WithOptions(show_online=self.include_online, show_leases=self.include_leases)
options = WithOptions(
show_online=self.include_online, show_leases=self.include_leases, show_status=self.include_status
)
for exporter in self.exporters:
exporter.rich_add_rows(table, options)

Expand All @@ -227,12 +243,10 @@ def model_dump_json(self, **kwargs):
exclude_fields.add("lease")
if not self.include_online:
exclude_fields.add("online")
if not self.include_status:
exclude_fields.add("status")

data = {
"exporters": [
exporter.model_dump(mode="json", exclude=exclude_fields) for exporter in self.exporters
]
}
data = {"exporters": [exporter.model_dump(mode="json", exclude=exclude_fields) for exporter in self.exporters]}
return json.dumps(data, **json_kwargs)

def model_dump(self, **kwargs):
Expand All @@ -241,12 +255,11 @@ def model_dump(self, **kwargs):
exclude_fields.add("lease")
if not self.include_online:
exclude_fields.add("online")
if not self.include_status:
exclude_fields.add("status")

return {"exporters": [exporter.model_dump(mode="json", exclude=exclude_fields) for exporter in self.exporters]}

return {
"exporters": [
exporter.model_dump(mode="json", exclude=exclude_fields) for exporter in self.exporters
]
}

class LeaseList(BaseModel):
leases: list[Lease]
Expand Down
10 changes: 9 additions & 1 deletion packages/jumpstarter/jumpstarter/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from .enums import ExporterStatus, LogSource
from .metadata import Metadata
from .tempfile import TemporarySocket, TemporaryTcpListener, TemporaryUnixListener

__all__ = ["Metadata", "TemporarySocket", "TemporaryUnixListener", "TemporaryTcpListener"]
__all__ = [
"ExporterStatus",
"LogSource",
"Metadata",
"TemporarySocket",
"TemporaryUnixListener",
"TemporaryTcpListener",
]
Loading
Loading