Skip to content
26 changes: 24 additions & 2 deletions packages/jumpstarter/jumpstarter/config/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
from jumpstarter.driver import Driver


class HookConfigV1Alpha1(BaseModel):
"""Configuration for lifecycle hooks."""

model_config = ConfigDict(populate_by_name=True)

pre_lease: str | None = Field(default=None, alias="preLease")
post_lease: str | None = Field(default=None, alias="postLease")
timeout: int = Field(default=300, description="Hook execution timeout in seconds")


class ExporterConfigV1Alpha1DriverInstanceProxy(BaseModel):
ref: str

Expand Down Expand Up @@ -85,6 +95,7 @@ class ExporterConfigV1Alpha1(BaseModel):
grpcOptions: dict[str, str | int] | None = Field(default_factory=dict)

export: dict[str, ExporterConfigV1Alpha1DriverInstance] = Field(default_factory=dict)
hooks: HookConfigV1Alpha1 = Field(default_factory=HookConfigV1Alpha1)

path: Path | None = Field(default=None)

Expand Down Expand Up @@ -119,7 +130,7 @@ def list(cls) -> ExporterConfigListV1Alpha1:

@classmethod
def dump_yaml(self, config: Self) -> str:
return yaml.safe_dump(config.model_dump(mode="json", exclude={"alias", "path"}), sort_keys=False)
return yaml.safe_dump(config.model_dump(mode="json", by_alias=True, exclude={"alias", "path"}), sort_keys=False)

@classmethod
def save(cls, config: Self, path: Optional[str] = None) -> Path:
Expand All @@ -130,7 +141,7 @@ def save(cls, config: Self, path: Optional[str] = None) -> Path:
else:
config.path = Path(path)
with config.path.open(mode="w") as f:
yaml.safe_dump(config.model_dump(mode="json", exclude={"alias", "path"}), f, sort_keys=False)
yaml.safe_dump(config.model_dump(mode="json", by_alias=True, exclude={"alias", "path"}), f, sort_keys=False)
return config.path

@classmethod
Expand Down Expand Up @@ -173,6 +184,16 @@ async def channel_factory():
)
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)

# Create hook executor if hooks are configured
hook_executor = None
if self.hooks.pre_lease or self.hooks.post_lease:
from jumpstarter.exporter.hooks import HookExecutor

hook_executor = HookExecutor(
config=self.hooks,
device_factory=ExporterConfigV1Alpha1DriverInstance(children=self.export).instantiate,
)

exporter = None
entered = False
try:
Expand All @@ -181,6 +202,7 @@ async def channel_factory():
device_factory=ExporterConfigV1Alpha1DriverInstance(children=self.export).instantiate,
tls=self.tls,
grpc_options=self.grpcOptions,
hook_executor=hook_executor,
)
# Initialize the exporter (registration, etc.)
await exporter.__aenter__()
Expand Down
52 changes: 52 additions & 0 deletions packages/jumpstarter/jumpstarter/config/exporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,55 @@ def test_exporter_config(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
ExporterConfigV1Alpha1.save(config)

assert config == ExporterConfigV1Alpha1.load("test")


def test_exporter_config_with_hooks(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
monkeypatch.setattr(ExporterConfigV1Alpha1, "BASE_PATH", tmp_path)

path = tmp_path / "test-hooks.yaml"

text = """apiVersion: jumpstarter.dev/v1alpha1
kind: ExporterConfig
metadata:
namespace: default
name: test-hooks
endpoint: "jumpstarter.my-lab.com:1443"
token: "test-token"
hooks:
preLease: |
echo "Pre-lease hook for $LEASE_NAME"
j power on
postLease: |
echo "Post-lease hook for $LEASE_NAME"
j power off
timeout: 600
export:
power:
type: "jumpstarter_driver_power.driver.PduPower"
"""
path.write_text(
text,
encoding="utf-8",
)

config = ExporterConfigV1Alpha1.load("test-hooks")

assert config.hooks.pre_lease == 'echo "Pre-lease hook for $LEASE_NAME"\nj power on\n'
assert config.hooks.post_lease == 'echo "Post-lease hook for $LEASE_NAME"\nj power off\n'
assert config.hooks.timeout == 600

# Test that it round-trips correctly
path.unlink()
ExporterConfigV1Alpha1.save(config)
reloaded_config = ExporterConfigV1Alpha1.load("test-hooks")

assert reloaded_config.hooks.pre_lease == config.hooks.pre_lease
assert reloaded_config.hooks.post_lease == config.hooks.post_lease
assert reloaded_config.hooks.timeout == config.hooks.timeout

# Test that the YAML uses camelCase
yaml_output = ExporterConfigV1Alpha1.dump_yaml(config)
assert "preLease:" in yaml_output
assert "postLease:" in yaml_output
assert "pre_lease:" not in yaml_output
assert "post_lease:" not in yaml_output
79 changes: 76 additions & 3 deletions packages/jumpstarter/jumpstarter/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from anyio import (
AsyncContextManagerMixin,
CancelScope,
Event,
connect_unix,
create_memory_object_stream,
create_task_group,
Expand All @@ -25,6 +26,7 @@
from jumpstarter.common.streams import connect_router_stream
from jumpstarter.config.tls import TLSConfigV1Alpha1
from jumpstarter.driver import Driver
from jumpstarter.exporter.hooks import HookContext, HookExecutor
from jumpstarter.exporter.session import Session

logger = logging.getLogger(__name__)
Expand All @@ -37,10 +39,13 @@
lease_name: str = field(init=False, default="")
tls: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1)
grpc_options: dict[str, str] = field(default_factory=dict)
hook_executor: HookExecutor | None = field(default=None)
registered: bool = field(init=False, default=False)
_stop_requested: bool = field(init=False, default=False)
_started: bool = field(init=False, default=False)
_tg: TaskGroup | None = field(init=False, default=None)
_current_client_name: str = field(init=False, default="")
_pre_lease_ready: Event | None = field(init=False, default=None)

def stop(self, wait_for_lease_exit=False):
"""Signal the exporter to stop.
Expand All @@ -49,9 +54,7 @@
wait_for_lease_exit (bool): If True, wait for the current lease to exit before stopping.
"""

# Stop immediately if not started yet or if immediate stop is requested
if (not self._started or not wait_for_lease_exit) and self._tg is not None:
logger.info("Stopping exporter immediately")
self._tg.cancel_scope.cancel()
elif not self._stop_requested:
self._stop_requested = True
Expand Down Expand Up @@ -145,6 +148,12 @@

tg.start_soon(listen)

# Wait for pre-lease hook to complete before processing connections
if self._pre_lease_ready is not None:
logger.info("Waiting for pre-lease hook to complete before accepting connections")
await self._pre_lease_ready.wait()
logger.info("Pre-lease hook completed, now accepting connections")

async with self.session() as path:
async for request in listen_rx:
logger.info("Handling new connection request on lease %s", lease_name)
Expand Down Expand Up @@ -187,19 +196,83 @@
tg.start_soon(status)
async for status in status_rx:
if self.lease_name != "" and self.lease_name != status.lease_name:
# Post-lease hook for the previous lease
if self.hook_executor and self._current_client_name:
hook_context = HookContext(
lease_name=self.lease_name,
client_name=self._current_client_name,
)
# Shield the post-lease hook from cancellation and await it
with CancelScope(shield=True):
await self.hook_executor.execute_post_lease_hook(hook_context)

self.lease_name = status.lease_name
logger.info("Lease status changed, killing existing connections")
# Reset event for next lease
self._pre_lease_ready = None
self.stop()
break

# Check for lease state transitions
previous_leased = hasattr(self, "_previous_leased") and self._previous_leased
current_leased = status.leased

self.lease_name = status.lease_name
if not self._started and self.lease_name != "":
self._started = True
# Create event for pre-lease synchronization
self._pre_lease_ready = Event()
tg.start_soon(self.handle, self.lease_name, tg)
if status.leased:

if current_leased:
logger.info("Currently leased by %s under %s", status.client_name, status.lease_name)
self._current_client_name = status.client_name

# Pre-lease hook when transitioning from unleased to leased
if not previous_leased:
if self.hook_executor:
hook_context = HookContext(
lease_name=status.lease_name,
client_name=status.client_name,
)

# Start pre-lease hook asynchronously
async def run_pre_lease_hook():
try:
await self.hook_executor.execute_pre_lease_hook(hook_context)

Check failure on line 242 in packages/jumpstarter/jumpstarter/exporter/exporter.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (B023)

packages/jumpstarter/jumpstarter/exporter/exporter.py:242:85: B023 Function definition does not bind loop variable `hook_context`
logger.info("Pre-lease hook completed successfully")
except Exception as e:
logger.error("Pre-lease hook failed: %s", e)
finally:
# Always set the event to unblock connections
if self._pre_lease_ready:
self._pre_lease_ready.set()

tg.start_soon(run_pre_lease_hook)
else:
# No hook configured, set event immediately
if self._pre_lease_ready:
self._pre_lease_ready.set()
else:
logger.info("Currently not leased")

# Post-lease hook when transitioning from leased to unleased
if previous_leased and self.hook_executor and self._current_client_name:
hook_context = HookContext(
lease_name=self.lease_name,
client_name=self._current_client_name,
)
# Shield the post-lease hook from cancellation and await it
with CancelScope(shield=True):
await self.hook_executor.execute_post_lease_hook(hook_context)

self._current_client_name = ""
# Reset event for next lease
self._pre_lease_ready = None

if self._stop_requested:
self.stop()
break

self._previous_leased = current_leased
self._tg = None
Loading
Loading