Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions agent/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
venv
root_mount
root

__pycache__
.pytest_cache
dist
.coverage
.coverage.*
coverage.xml
test-results.xml

vendor/lock_file.bak
112 changes: 46 additions & 66 deletions agent/skyhook-agent/src/skyhook_agent/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,9 @@
# LICENSE END
#
















import sys
import os
import stat
import subprocess
import base64
import asyncio
Expand Down Expand Up @@ -129,21 +115,23 @@ async def tee(cmd: List[str], stdout_sink_path: str, stderr_sink_path: str, writ
def get_host_path_for_steps(copy_dir: str):
return f"{copy_dir}/skyhook_dir"

def get_skyhook_directory(root_mount: str) -> str:
return f"{root_mount}/etc/skyhook"
def get_skyhook_directory() -> str:
return f"/etc/skyhook"

def get_flag_dir() -> str:
return f"{get_skyhook_directory()}/flags"

def get_flag_dir(root_mount: str) -> str:
return f"{get_skyhook_directory(root_mount)}/flags"
def get_history_dir() -> str:
return f"{get_skyhook_directory()}/history"

def get_history_dir(root_mount: str) -> str:
return f"{get_skyhook_directory(root_mount)}/history"
def get_log_dir() -> str:
return f"/var/log/skyhook"

def get_log_file(root_mount: str, step_path: str, copy_dir: str, config_data: dict, timestamp: str=None) -> str:
def get_log_file(step_path: str, copy_dir: str, config_data: dict, timestamp: str=None) -> str:
if timestamp is None:
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M%S")
package_name, package_current_version = _get_package_information(config_data)
log_file = f"{root_mount}/var/log/skyhook/{package_name}/{package_current_version}/{step_path.replace(get_host_path_for_steps(copy_dir), '')}-{timestamp}.log"
log_file = f"{get_log_dir()}/{package_name}/{package_current_version}/{step_path.replace(get_host_path_for_steps(copy_dir), '')}-{timestamp}.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
return log_file

Expand All @@ -160,9 +148,9 @@ def cleanup_old_logs(log_file_glob: str) -> None:


def make_flag_path(
step: Step|UpgradeStep, root_mount: str, config_data: dict
step: Step|UpgradeStep, config_data: dict
) -> str:
flag_dir = get_flag_dir(root_mount)
flag_dir = get_flag_dir()
package_name, package_current_version = _get_package_information(config_data)
marker = base64.b64encode(bytes(f"{step.arguments}_{step.returncodes}", "utf-8")).decode("utf-8")
return f"{flag_dir}/{package_name}/{package_current_version}/{step.path}_{marker}"
Expand All @@ -174,14 +162,14 @@ def set_flag(flag_file: str, msg: str = "") -> None:
f.write(msg)


def _run(cmds: list[str], log_path: str, on_host: bool, root_mount: str, write_cmds=False, **kwargs) -> int:
def _run(cmds: list[str], log_path: str, write_cmds=False, **kwargs) -> int:
"""
Synchronous wrapper around the tee command to have logs written to disk
"""
# "tee" the stdout and stderr to a file to log the step results
result = asyncio.run(
tee(
(["chroot", root_mount] if on_host else []) + cmds,
cmds,
log_path,
f"{log_path}.err",
write_cmds=write_cmds,
Expand All @@ -193,12 +181,11 @@ def _run(cmds: list[str], log_path: str, on_host: bool, root_mount: str, write_c

def run_step(
step: Step|UpgradeStep,
root_mount: str,
copy_dir: str,
config_data: dict
) -> bool:
"""
Run the given Step via chroot on the given root_mount path if Step.on_host is True.
Run the given Step.
Any arguments for the step that start with "env:" will be sourced from their environment variable.
Any environment variables that do not exist will fail the run.
The following environment variables are also set into the steps execution environment:
Expand All @@ -207,7 +194,6 @@ def run_step(

Args:
step(Step): Object of class Step.
root_mount(str): Path to the mount dir
copy_dir(str): Directory path containing all the step scripts.
config_data(dict): The config data. Must contain package_name and package_version
Returns: bool of return codes
Expand All @@ -233,21 +219,18 @@ def run_step(
print(msg)
return True

subprocess.run(
(f"chroot {root_mount} " if step.on_host else "") + f"chmod +x {step_path}",
shell=True,
)
# chmod +x the step
os.chmod(step_path, os.stat(step_path).st_mode | stat.S_IXGRP | stat.S_IXUSR | stat.S_IXOTH)

time.sleep(1)
log_file = get_log_file(root_mount, step_path, copy_dir, config_data)
log_file = get_log_file(step_path, copy_dir, config_data)
return_code = _run(
[step_path, *step.arguments],
log_file,
on_host=step.on_host,
root_mount=root_mount,
# Make sure to include the original environment here or else things like path resolution dont work
env=dict(**os.environ, **step.env, **{"STEP_ROOT": get_host_path_for_steps(copy_dir), "SKYHOOK_DIR": copy_dir}),)

cleanup_old_logs(get_log_file(root_mount, step_path, copy_dir, config_data, "*"))
cleanup_old_logs(get_log_file(step_path, copy_dir, config_data, "*"))
if return_code not in step.returncodes:
print(f"FAILED: {step.path} {' '.join(step.arguments)} {return_code}")
return True
Expand Down Expand Up @@ -282,12 +265,11 @@ def check_flag_file(
return True
return False

def get_or_update_history(root_mount: str, config_data: dict, write: bool = False, step: Step|UpgradeStep = None, mode: Mode = None) -> None:
def get_or_update_history(config_data: dict, write: bool = False, step: Step|UpgradeStep = None, mode: Mode = None) -> None:
"""
Manages the history file for tracking version changes, and auditing purposes.

Args:
root_mount (str): The root directory.
write (bool): If True, updates the history file. If False, reads from the history file. Defaults to False.
step (Step | UpgradeStep): The current step being processed. Required when reading the history.
mode (Mode): The mode the controller is running in. Required when writing to the history.
Expand All @@ -302,7 +284,7 @@ def get_or_update_history(root_mount: str, config_data: dict, write: bool = Fals
"""
package_name, package_current_version = _get_package_information(config_data)
# Create history dir if it doesn't already exist
history_dir = get_history_dir(root_mount)
history_dir = get_history_dir()
os.makedirs(history_dir, exist_ok=True)

history_file = f"{history_dir}/{package_name}.json"
Expand Down Expand Up @@ -347,13 +329,13 @@ def get_or_update_history(root_mount: str, config_data: dict, write: bool = Fals
if step and isinstance(step, UpgradeStep):
step.arguments.extend([history_data["current-version"], package_current_version])

def summarize_check_results(results: list[bool], step_data: dict[Mode, list[Step|UpgradeStep]], step_selector: Mode, root_mount: str) -> bool:
def summarize_check_results(results: list[bool], step_data: dict[Mode, list[Step|UpgradeStep]], step_selector: Mode, ) -> bool:
"""
Returning True means there is at least one failure
"""
flag_dir = get_flag_dir(root_mount)
if not os.path.exists(flag_dir) or len(results) == 0:
print("It does not look like you have successfully applied any steps yet.")
flag_dir = get_flag_dir()
if len(results) != len(step_data[step_selector]):
print("It does not look like you have successfully run all check steps yet.")
return True

# Any failure fails the whole thing
Expand Down Expand Up @@ -385,7 +367,7 @@ def make_config_data_from_resource_id() -> dict:
}
return config_data

def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str, on_host: bool) -> bool:
def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str) -> bool:
"""
Run an interrupt if there hasn't been an interrupt already for the skyhook ID.
"""
Expand All @@ -395,9 +377,9 @@ def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str, on_host: b
config_data = make_config_data_from_resource_id()

interrupt = interrupts.inflate(interrupt_data)

os.chroot(root_mount)
# Check if the interrupt has already been run for this particular skyhook resource
interrupt_dir = f"{get_skyhook_directory(root_mount)}/interrupts/flags/{SKYHOOK_RESOURCE_ID}"
interrupt_dir = f"{get_skyhook_directory()}/interrupts/flags/{SKYHOOK_RESOURCE_ID}"
os.makedirs(interrupt_dir, exist_ok=True)
for i, cmd in enumerate(interrupt.interrupt_cmd):
interrupt_id = f"{interrupt._type()}_{i}"
Expand All @@ -412,9 +394,7 @@ def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str, on_host: b

return_code = _run(
cmd,
get_log_file(root_mount, f"interrupts/{interrupt_id}", copy_dir, config_data),
root_mount=root_mount,
on_host=on_host,
get_log_file(f"interrupts/{interrupt_id}", copy_dir, config_data),
write_cmds=True
)

Expand All @@ -428,9 +408,9 @@ def do_interrupt(interrupt_data: str, root_mount: str, copy_dir: str, on_host: b
return False

## Remove all step flags after uninstall
def remove_flags(step_data: dict[Mode, list[Step|UpgradeStep]], root_mount: str, config_data: dict) -> None:
def remove_flags(step_data: dict[Mode, list[Step|UpgradeStep]], config_data: dict) -> None:
for step in [step for steps in step_data.values() for step in steps]:
flag_file = make_flag_path(step, root_mount, config_data)
flag_file = make_flag_path(step, config_data)
if os.path.exists(flag_file): # Check if the file exists before trying to remove it
os.remove(flag_file)

Expand All @@ -440,7 +420,7 @@ def main(mode: Mode, root_mount: str, copy_dir: str, interrupt_data: None|str, a
return False

if mode == Mode.INTERRUPT:
return do_interrupt(interrupt_data, root_mount, copy_dir, True)
return do_interrupt(interrupt_data, root_mount, copy_dir)

_, SKYHOOK_DATA_DIR = _get_env_config()

Expand Down Expand Up @@ -474,9 +454,9 @@ def agent_main(mode: Mode, root_mount: str, copy_dir: str, config_data: dict, in

# Pull out step_data so it matches with existing code
step_data = config_data["modes"]
os.chroot(root_mount)
# Make a flag to mark Skyhook has started
set_flag(f"{get_flag_dir(root_mount)}/START")
set_flag(f"{get_flag_dir()}/START")
results = []

# If no steps configured for this mode but being run output warning that this is a no-op
Expand All @@ -487,19 +467,19 @@ def agent_main(mode: Mode, root_mount: str, copy_dir: str, config_data: dict, in
# Make the flag file without the host path argument (first one). This is because in operator world
# the host path is going to change every time the Skyhook Custom Resource changes so it would
# look like a step hasn't been run when it fact it had.
flag_file = make_flag_path(step, root_mount, config_data)
flag_file = make_flag_path(step, config_data)

# If upgrading get the from and to versions from the history file
# so it can be passed to the upgrade steps via args or environment vars
if mode == Mode.UPGRADE or mode == Mode.UPGRADE_CHECK:
get_or_update_history(root_mount, config_data, step=step)
get_or_update_history(config_data, step=step)

if not str(mode).endswith("-check"):
if check_flag_file(step, flag_file, always_run_step, mode):
continue
print(f"{mode} {step.path} {step.arguments} {step.returncodes} {step.idempotence} {step.on_host}")

failed = run_step(step, root_mount, copy_dir, config_data)
failed = run_step(step, copy_dir, config_data)
if failed:
return True

Expand All @@ -509,20 +489,20 @@ def agent_main(mode: Mode, root_mount: str, copy_dir: str, config_data: dict, in
)
else:
print(f"{mode} {step.path} {step.arguments} {step.returncodes} {step.idempotence} {step.on_host}")
results.append(run_step(step, root_mount, copy_dir, config_data))
results.append(run_step(step, copy_dir, config_data))


if mode in CHECK_TO_APPLY and len(step_data.get(mode, [])) > 0:
if summarize_check_results(results, step_data, mode, root_mount):
if summarize_check_results(results, step_data, mode):
return True

## If APPLY_CHECK, UPGRADE_CHECK, or UNINSTALL_CHECK finished successfully update installed version history
if mode in [Mode.APPLY_CHECK, Mode.UPGRADE_CHECK, Mode.UNINSTALL_CHECK]:
get_or_update_history(root_mount, config_data, write=True, mode=mode)
get_or_update_history(config_data, write=True, mode=mode)

## We also want to remove the flags if the package was uninstalled
if mode == Mode.UNINSTALL_CHECK:
remove_flags(step_data, root_mount, config_data)
remove_flags(step_data, config_data)

return False

Expand Down Expand Up @@ -577,10 +557,10 @@ def cli(sys_argv: list[str]=sys.argv):
print(str.center("Directory CONFIGURATION", 20, "-"))
# print flag dir and log dir
config_data = make_config_data_from_resource_id()
print(f"flag_dir: {get_flag_dir(root_mount)}/{config_data['package_name']}/{config_data['package_version']}")
log_dir = '/'.join(get_log_file(root_mount, 'step',copy_dir, config_data, timestamp='timestamp').split('/')[:-1])
print(f"flag_dir: {get_flag_dir()}/{config_data['package_name']}/{config_data['package_version']}")
log_dir = '/'.join(get_log_file('step',copy_dir, config_data, timestamp='timestamp').split('/')[:-1])
print(f"log_dir: {log_dir}")
print(f"history_file: {get_history_dir(root_mount)}/{config_data['package_name']}.json")
print(f"history_file: {get_history_dir()}/{config_data['package_name']}.json")
print("-" * 20)

return main(mode, root_mount, copy_dir, interrupt_data, always_run_step)
Expand Down
Loading