Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions .github/scripts/build_publish_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
#!/usr/bin/env python3
import argparse
import json
import pathlib
import re
import sys
from typing import Any


def normalize_dependencies(raw_deps: Any) -> list[dict[str, Any]]:
if raw_deps in (None, ""):
return []
if isinstance(raw_deps, dict):
return [{"name": name, "version": version} for name, version in raw_deps.items()]
if isinstance(raw_deps, list):
normalized: list[dict[str, Any]] = []
for dep in raw_deps:
if isinstance(dep, str):
normalized.append({"name": dep, "version": ""})
elif isinstance(dep, dict) and isinstance(dep.get("name"), str):
normalized.append({"name": dep["name"], "version": dep.get("version") or ""})
else:
raise ValueError(
"dependency list entries must be strings or {name, version} objects"
)
return normalized
raise ValueError(f"`dependencies` must be a map or list, got {type(raw_deps).__name__}")
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def derive_registry_function_name(function_id: str, metadata: dict[str, Any] | None) -> str:
metadata = metadata or {}
for key in ("registry_name", "name"):
value = metadata.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if "::" in function_id:
return function_id.rsplit("::", 1)[1]
return function_id


def _extract_array(payload: dict[str, Any], key: str) -> list[dict[str, Any]]:
value = payload.get(key, [])
if value is None:
return []
if not isinstance(value, list):
raise ValueError(f"`{key}` must be an array")
return value


def _read_yaml(path: pathlib.Path) -> Any:
import yaml

return yaml.safe_load(path.read_text(encoding="utf-8"))


def _schema_or_empty(value: Any) -> dict[str, Any]:
if value is None:
return {}
if isinstance(value, dict):
return value
raise ValueError("function schema fields must be objects or null")


def _metadata_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}


def _string_or_empty(value: Any) -> str:
return value if isinstance(value, str) else ""


def _slug(value: Any, fallback: str) -> str:
raw = value if isinstance(value, str) else fallback
slug = re.sub(r"[^a-z0-9]+", "-", raw.lower()).strip("-")
return slug or fallback


def _normalize_registry_function(function: dict[str, Any]) -> dict[str, Any]:
return {
"name": function.get("name"),
"description": _string_or_empty(function.get("description")),
"request_schema": _schema_or_empty(function.get("request_schema")),
"response_schema": _schema_or_empty(function.get("response_schema")),
"metadata": _metadata_or_empty(function.get("metadata")),
}


def _derive_trigger_name(trigger: dict[str, Any]) -> str:
metadata = _metadata_or_empty(trigger.get("metadata"))
for key in ("registry_name", "name"):
value = metadata.get(key)
if isinstance(value, str) and value.strip():
return _slug(value, "trigger")

config = trigger.get("config") if isinstance(trigger.get("config"), dict) else {}
api_path = config.get("api_path")
if isinstance(api_path, str) and api_path.strip():
return _slug(api_path, "trigger")

function_id = trigger.get("function_id")
if isinstance(function_id, str) and function_id.strip():
return _slug(function_id.rsplit("::", 1)[-1], "trigger")

return _slug(trigger.get("trigger_type") or trigger.get("name"), "trigger")


def _normalize_registry_trigger(trigger: dict[str, Any]) -> dict[str, Any]:
config = trigger.get("config") if isinstance(trigger.get("config"), dict) else {}
metadata = _metadata_or_empty(trigger.get("metadata")).copy()
for source_key, metadata_key in (
("id", "engine_id"),
("trigger_type", "trigger_type"),
("function_id", "function_id"),
):
if trigger.get(source_key) is not None:
metadata.setdefault(metadata_key, trigger.get(source_key))
if config:
metadata.setdefault("config", config)

return {
"name": _derive_trigger_name(trigger),
"description": _string_or_empty(trigger.get("description")),
"invocation_schema": _schema_or_empty(trigger.get("invocation_schema")),
"return_schema": _schema_or_empty(trigger.get("return_schema")),
"metadata": metadata,
}


def normalize_worker_interface(
*,
worker_name: str,
workers_json: dict[str, Any],
functions_json: dict[str, Any],
triggers_json: dict[str, Any] | None = None,
) -> dict[str, list[dict[str, Any]]]:
workers = _extract_array(workers_json, "workers")
matches = [w for w in workers if w.get("name") == worker_name or w.get("id") == worker_name]
if len(matches) != 1:
raise ValueError(f"expected exactly one worker matching {worker_name!r}, found {len(matches)}")

worker_function_ids = matches[0].get("functions") or []
if not isinstance(worker_function_ids, list):
raise ValueError("worker `functions` must be an array")
Comment on lines +141 to +143
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't coerce invalid functions payloads to [].

matches[0].get("functions") or [] turns falsy non-lists like {} or "" into a valid empty list. That weakens the fail-fast path below and can still let a malformed engine response publish with no functions.

Proposed fix
-    worker_function_ids = matches[0].get("functions") or []
-    if not isinstance(worker_function_ids, list):
+    raw_worker_function_ids = matches[0].get("functions")
+    if raw_worker_function_ids is None:
+        worker_function_ids = []
+    elif not isinstance(raw_worker_function_ids, list):
         raise ValueError("worker `functions` must be an array")
+    else:
+        worker_function_ids = raw_worker_function_ids
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
worker_function_ids = matches[0].get("functions") or []
if not isinstance(worker_function_ids, list):
raise ValueError("worker `functions` must be an array")
raw_worker_function_ids = matches[0].get("functions")
if raw_worker_function_ids is None:
worker_function_ids = []
elif not isinstance(raw_worker_function_ids, list):
raise ValueError("worker `functions` must be an array")
else:
worker_function_ids = raw_worker_function_ids
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/scripts/build_publish_payload.py around lines 141 - 143, The code
currently coerces falsy non-list values to [] by using
matches[0].get("functions") or [], which masks malformed payloads; change the
assignment to capture the raw value (e.g., worker_function_ids =
matches[0].get("functions")) and keep the existing type check (if not
isinstance(worker_function_ids, list): raise ValueError(...)) so that non-list
falsy values like {} or "" fail fast instead of being converted to an empty
list; ensure any subsequent logic handles a None case explicitly if you expect
the key to be optional.


functions_by_id = {
f.get("function_id"): f
for f in _extract_array(functions_json, "functions")
if f.get("function_id")
}

missing_function_ids = [fid for fid in worker_function_ids if fid not in functions_by_id]
if missing_function_ids:
raise ValueError(
"missing function details for worker functions: "
+ ", ".join(str(fid) for fid in missing_function_ids)
)

functions = []
for function_id in worker_function_ids:
details = functions_by_id[function_id]
metadata = details.get("metadata") or {}
functions.append(
{
"name": derive_registry_function_name(function_id, metadata),
"description": _string_or_empty(details.get("description")),
"request_schema": _schema_or_empty(details.get("request_format")),
"response_schema": _schema_or_empty(details.get("response_format")),
"metadata": _metadata_or_empty(metadata),
}
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

worker_ids = set(worker_function_ids)
triggers = []
if triggers_json:
for trigger in _extract_array(triggers_json, "triggers"):
if trigger.get("function_id") not in worker_ids:
continue
triggers.append(_normalize_registry_trigger(trigger))

return {"functions": functions, "triggers": triggers}


def build_payload(
*,
repo_root: pathlib.Path,
worker: str,
version: str,
registry_tag: str,
deploy: str,
repo_url: str,
interface: dict[str, Any],
binaries: dict[str, Any],
image_tag: str,
) -> dict[str, Any]:
root = repo_root / worker
meta = _read_yaml(root / "iii.worker.yaml") or {}

readme_path = root / "README.md"
readme = readme_path.read_text(encoding="utf-8") if readme_path.exists() else ""

config_path = root / "config.yaml"
config = _read_yaml(config_path) if config_path.exists() else {}
if config is None:
config = {}

payload: dict[str, Any] = {
"worker_name": worker,
"version": version,
"tag": registry_tag or "latest",
"type": deploy,
"readme": readme,
"repo": repo_url,
"description": meta.get("description", ""),
"dependencies": normalize_dependencies(meta.get("dependencies")),
"config": config,
"functions": [
_normalize_registry_function(function)
for function in interface.get("functions") or []
],
"triggers": [
_normalize_registry_trigger(trigger)
for trigger in interface.get("triggers") or []
],
}

if deploy == "binary":
if not binaries:
raise ValueError("deploy=binary requires non-empty binaries")
payload["binaries"] = binaries
elif deploy == "image":
if not image_tag:
raise ValueError("deploy=image requires image_tag")
payload["image_tag"] = image_tag
else:
raise ValueError(f"unsupported deploy={deploy}")

return payload


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--worker", required=True)
parser.add_argument("--version", required=True)
parser.add_argument("--registry-tag", default="latest")
parser.add_argument("--deploy", required=True, choices=["binary", "image"])
parser.add_argument("--repo-url", required=True)
parser.add_argument("--interface-json", required=True)
parser.add_argument("--binaries-json", default="")
parser.add_argument("--image-tag", default="")
parser.add_argument("--repo-root", default=".")
parser.add_argument("--out", default="payload.json")
args = parser.parse_args()

interface = json.loads(pathlib.Path(args.interface_json).read_text(encoding="utf-8"))
binaries = {}
if args.binaries_json:
binaries = json.loads(pathlib.Path(args.binaries_json).read_text(encoding="utf-8"))

payload = build_payload(
repo_root=pathlib.Path(args.repo_root),
worker=args.worker,
version=args.version,
registry_tag=args.registry_tag,
deploy=args.deploy,
repo_url=args.repo_url,
interface=interface,
binaries=binaries,
image_tag=args.image_tag,
)
pathlib.Path(args.out).write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(json.dumps({k: v for k, v in payload.items() if k != "readme"}, indent=2))
return 0


if __name__ == "__main__":
sys.exit(main())
91 changes: 91 additions & 0 deletions .github/scripts/collect_worker_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
import argparse
import json
import pathlib
import subprocess
import sys
import time

from build_publish_payload import normalize_worker_interface


def count_worker_matches(workers_json: dict[str, object], worker_name: str) -> int:
workers = workers_json.get("workers", [])
if not isinstance(workers, list):
return 0
return sum(
1
for worker in workers
if isinstance(worker, dict)
and (worker.get("name") == worker_name or worker.get("id") == worker_name)
)


def run_iii(function_id: str, payload: dict[str, object]) -> dict[str, object]:
completed = subprocess.run(
[
"iii",
"trigger",
"--function-id",
function_id,
"--payload",
json.dumps(payload),
],
check=True,
text=True,
capture_output=True,
timeout=30,
)
return json.loads(completed.stdout)


def wait_for_worker(worker_name: str, wait_seconds: int) -> dict[str, object]:
deadline = time.monotonic() + wait_seconds
workers_json = run_iii("engine::workers::list", {})

while count_worker_matches(workers_json, worker_name) != 1 and time.monotonic() < deadline:
time.sleep(2)
workers_json = run_iii("engine::workers::list", {})

return workers_json


def collect_triggers() -> dict[str, object] | None:
try:
return run_iii("engine::triggers::list", {"include_internal": True})
except (
subprocess.CalledProcessError,
subprocess.TimeoutExpired,
json.JSONDecodeError,
) as exc:
print(
f"::warning::could not collect triggers; publishing triggers=[]: {exc}",
file=sys.stderr,
)
return None


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--worker", required=True)
parser.add_argument("--out", default="worker-interface.json")
parser.add_argument("--wait-seconds", type=int, default=0)
args = parser.parse_args()

workers_json = wait_for_worker(args.worker, args.wait_seconds)
functions_json = run_iii("engine::functions::list", {"include_internal": True})
triggers_json = collect_triggers()

interface = normalize_worker_interface(
worker_name=args.worker,
workers_json=workers_json,
functions_json=functions_json,
triggers_json=triggers_json,
)
pathlib.Path(args.out).write_text(json.dumps(interface, indent=2) + "\n", encoding="utf-8")
print(json.dumps(interface, indent=2))
return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading