diff --git a/.github/scripts/build_publish_payload.py b/.github/scripts/build_publish_payload.py new file mode 100644 index 00000000..8aad641c --- /dev/null +++ b/.github/scripts/build_publish_payload.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +import argparse +import json +import pathlib +import re +import sys +from typing import Any + + +def normalize_dependencies(raw_deps: Any) -> list[dict[str, Any]]: + if raw_deps in (None, ""): + return [] + if isinstance(raw_deps, dict): + return [{"name": name, "version": version} for name, version in raw_deps.items()] + if isinstance(raw_deps, list): + normalized: list[dict[str, Any]] = [] + for dep in raw_deps: + if isinstance(dep, str): + normalized.append({"name": dep, "version": ""}) + elif isinstance(dep, dict) and isinstance(dep.get("name"), str): + normalized.append({"name": dep["name"], "version": dep.get("version") or ""}) + else: + raise ValueError( + "dependency list entries must be strings or {name, version} objects" + ) + return normalized + raise ValueError(f"`dependencies` must be a map or list, got {type(raw_deps).__name__}") + + +def derive_registry_function_name(function_id: str, metadata: dict[str, Any] | None) -> str: + metadata = metadata or {} + for key in ("registry_name", "name"): + value = metadata.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + if "::" in function_id: + return function_id.rsplit("::", 1)[1] + return function_id + + +def _extract_array(payload: dict[str, Any], key: str) -> list[dict[str, Any]]: + value = payload.get(key, []) + if value is None: + return [] + if not isinstance(value, list): + raise ValueError(f"`{key}` must be an array") + return value + + +def _read_yaml(path: pathlib.Path) -> Any: + import yaml + + return yaml.safe_load(path.read_text(encoding="utf-8")) + + +def _schema_or_empty(value: Any) -> dict[str, Any]: + if value is None: + return {} + if isinstance(value, dict): + return value + raise ValueError("function schema fields must be objects or null") + + +def _metadata_or_empty(value: Any) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def _string_or_empty(value: Any) -> str: + return value if isinstance(value, str) else "" + + +def _slug(value: Any, fallback: str) -> str: + raw = value if isinstance(value, str) else fallback + slug = re.sub(r"[^a-z0-9]+", "-", raw.lower()).strip("-") + return slug or fallback + + +def _normalize_registry_function(function: dict[str, Any]) -> dict[str, Any]: + return { + "name": function.get("name"), + "description": _string_or_empty(function.get("description")), + "request_schema": _schema_or_empty(function.get("request_schema")), + "response_schema": _schema_or_empty(function.get("response_schema")), + "metadata": _metadata_or_empty(function.get("metadata")), + } + + +def _derive_trigger_name(trigger: dict[str, Any]) -> str: + metadata = _metadata_or_empty(trigger.get("metadata")) + for key in ("registry_name", "name"): + value = metadata.get(key) + if isinstance(value, str) and value.strip(): + return _slug(value, "trigger") + + config = trigger.get("config") if isinstance(trigger.get("config"), dict) else {} + api_path = config.get("api_path") + if isinstance(api_path, str) and api_path.strip(): + return _slug(api_path, "trigger") + + function_id = trigger.get("function_id") + if isinstance(function_id, str) and function_id.strip(): + return _slug(function_id.rsplit("::", 1)[-1], "trigger") + + return _slug(trigger.get("trigger_type") or trigger.get("name"), "trigger") + + +def _normalize_registry_trigger(trigger: dict[str, Any]) -> dict[str, Any]: + config = trigger.get("config") if isinstance(trigger.get("config"), dict) else {} + metadata = _metadata_or_empty(trigger.get("metadata")).copy() + for source_key, metadata_key in ( + ("id", "engine_id"), + ("trigger_type", "trigger_type"), + ("function_id", "function_id"), + ): + if trigger.get(source_key) is not None: + metadata.setdefault(metadata_key, trigger.get(source_key)) + if config: + metadata.setdefault("config", config) + + return { + "name": _derive_trigger_name(trigger), + "description": _string_or_empty(trigger.get("description")), + "invocation_schema": _schema_or_empty(trigger.get("invocation_schema")), + "return_schema": _schema_or_empty(trigger.get("return_schema")), + "metadata": metadata, + } + + +def normalize_worker_interface( + *, + worker_name: str, + workers_json: dict[str, Any], + functions_json: dict[str, Any], + triggers_json: dict[str, Any] | None = None, +) -> dict[str, list[dict[str, Any]]]: + workers = _extract_array(workers_json, "workers") + matches = [w for w in workers if w.get("name") == worker_name or w.get("id") == worker_name] + if len(matches) != 1: + raise ValueError(f"expected exactly one worker matching {worker_name!r}, found {len(matches)}") + + worker_function_ids = matches[0].get("functions") or [] + if not isinstance(worker_function_ids, list): + raise ValueError("worker `functions` must be an array") + + functions_by_id = { + f.get("function_id"): f + for f in _extract_array(functions_json, "functions") + if f.get("function_id") + } + + missing_function_ids = [fid for fid in worker_function_ids if fid not in functions_by_id] + if missing_function_ids: + raise ValueError( + "missing function details for worker functions: " + + ", ".join(str(fid) for fid in missing_function_ids) + ) + + functions = [] + for function_id in worker_function_ids: + details = functions_by_id[function_id] + metadata = details.get("metadata") or {} + functions.append( + { + "name": derive_registry_function_name(function_id, metadata), + "description": _string_or_empty(details.get("description")), + "request_schema": _schema_or_empty(details.get("request_format")), + "response_schema": _schema_or_empty(details.get("response_format")), + "metadata": _metadata_or_empty(metadata), + } + ) + + worker_ids = set(worker_function_ids) + triggers = [] + if triggers_json: + for trigger in _extract_array(triggers_json, "triggers"): + if trigger.get("function_id") not in worker_ids: + continue + triggers.append(_normalize_registry_trigger(trigger)) + + return {"functions": functions, "triggers": triggers} + + +def build_payload( + *, + repo_root: pathlib.Path, + worker: str, + version: str, + registry_tag: str, + deploy: str, + repo_url: str, + interface: dict[str, Any], + binaries: dict[str, Any], + image_tag: str, +) -> dict[str, Any]: + root = repo_root / worker + meta = _read_yaml(root / "iii.worker.yaml") or {} + + readme_path = root / "README.md" + readme = readme_path.read_text(encoding="utf-8") if readme_path.exists() else "" + + config_path = root / "config.yaml" + config = _read_yaml(config_path) if config_path.exists() else {} + if config is None: + config = {} + + payload: dict[str, Any] = { + "worker_name": worker, + "version": version, + "tag": registry_tag or "latest", + "type": deploy, + "readme": readme, + "repo": repo_url, + "description": meta.get("description", ""), + "dependencies": normalize_dependencies(meta.get("dependencies")), + "config": config, + "functions": [ + _normalize_registry_function(function) + for function in interface.get("functions") or [] + ], + "triggers": [ + _normalize_registry_trigger(trigger) + for trigger in interface.get("triggers") or [] + ], + } + + if deploy == "binary": + if not binaries: + raise ValueError("deploy=binary requires non-empty binaries") + payload["binaries"] = binaries + elif deploy == "image": + if not image_tag: + raise ValueError("deploy=image requires image_tag") + payload["image_tag"] = image_tag + else: + raise ValueError(f"unsupported deploy={deploy}") + + return payload + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--worker", required=True) + parser.add_argument("--version", required=True) + parser.add_argument("--registry-tag", default="latest") + parser.add_argument("--deploy", required=True, choices=["binary", "image"]) + parser.add_argument("--repo-url", required=True) + parser.add_argument("--interface-json", required=True) + parser.add_argument("--binaries-json", default="") + parser.add_argument("--image-tag", default="") + parser.add_argument("--repo-root", default=".") + parser.add_argument("--out", default="payload.json") + args = parser.parse_args() + + interface = json.loads(pathlib.Path(args.interface_json).read_text(encoding="utf-8")) + binaries = {} + if args.binaries_json: + binaries = json.loads(pathlib.Path(args.binaries_json).read_text(encoding="utf-8")) + + payload = build_payload( + repo_root=pathlib.Path(args.repo_root), + worker=args.worker, + version=args.version, + registry_tag=args.registry_tag, + deploy=args.deploy, + repo_url=args.repo_url, + interface=interface, + binaries=binaries, + image_tag=args.image_tag, + ) + pathlib.Path(args.out).write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + print(json.dumps({k: v for k, v in payload.items() if k != "readme"}, indent=2)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/.github/scripts/collect_worker_interface.py b/.github/scripts/collect_worker_interface.py new file mode 100644 index 00000000..50f0c2bb --- /dev/null +++ b/.github/scripts/collect_worker_interface.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +import argparse +import json +import pathlib +import subprocess +import sys +import time + +from build_publish_payload import normalize_worker_interface + + +def count_worker_matches(workers_json: dict[str, object], worker_name: str) -> int: + workers = workers_json.get("workers", []) + if not isinstance(workers, list): + return 0 + return sum( + 1 + for worker in workers + if isinstance(worker, dict) + and (worker.get("name") == worker_name or worker.get("id") == worker_name) + ) + + +def run_iii(function_id: str, payload: dict[str, object]) -> dict[str, object]: + completed = subprocess.run( + [ + "iii", + "trigger", + "--function-id", + function_id, + "--payload", + json.dumps(payload), + ], + check=True, + text=True, + capture_output=True, + timeout=30, + ) + return json.loads(completed.stdout) + + +def wait_for_worker(worker_name: str, wait_seconds: int) -> dict[str, object]: + deadline = time.monotonic() + wait_seconds + workers_json = run_iii("engine::workers::list", {}) + + while count_worker_matches(workers_json, worker_name) != 1 and time.monotonic() < deadline: + time.sleep(2) + workers_json = run_iii("engine::workers::list", {}) + + return workers_json + + +def collect_triggers() -> dict[str, object] | None: + try: + return run_iii("engine::triggers::list", {"include_internal": True}) + except ( + subprocess.CalledProcessError, + subprocess.TimeoutExpired, + json.JSONDecodeError, + ) as exc: + print( + f"::warning::could not collect triggers; publishing triggers=[]: {exc}", + file=sys.stderr, + ) + return None + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--worker", required=True) + parser.add_argument("--out", default="worker-interface.json") + parser.add_argument("--wait-seconds", type=int, default=0) + args = parser.parse_args() + + workers_json = wait_for_worker(args.worker, args.wait_seconds) + functions_json = run_iii("engine::functions::list", {"include_internal": True}) + triggers_json = collect_triggers() + + interface = normalize_worker_interface( + worker_name=args.worker, + workers_json=workers_json, + functions_json=functions_json, + triggers_json=triggers_json, + ) + pathlib.Path(args.out).write_text(json.dumps(interface, indent=2) + "\n", encoding="utf-8") + print(json.dumps(interface, indent=2)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/.github/scripts/resolve_binary_artifacts.py b/.github/scripts/resolve_binary_artifacts.py new file mode 100644 index 00000000..f374a40e --- /dev/null +++ b/.github/scripts/resolve_binary_artifacts.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +import argparse +import json +import pathlib +import sys +import urllib.request +from collections.abc import Callable + + +DEFAULT_TARGETS = [ + "x86_64-apple-darwin", + "aarch64-apple-darwin", + "x86_64-pc-windows-msvc", + "i686-pc-windows-msvc", + "aarch64-pc-windows-msvc", + "x86_64-unknown-linux-gnu", + "x86_64-unknown-linux-musl", + "aarch64-unknown-linux-gnu", + "armv7-unknown-linux-gnueabihf", +] + + +def read_checksum_url(url: str) -> str: + with urllib.request.urlopen(url, timeout=20) as response: + text = response.read().decode("utf-8").strip() + return text.split()[0] + + +def build_binary_artifact_map( + *, + repo_url: str, + tag: str, + bin_name: str, + targets: list[str], + read_checksum: Callable[[str], str], +) -> dict[str, dict[str, str]]: + base = f"{repo_url}/releases/download/{tag}" + binaries = {} + for target in targets: + ext = "zip" if "windows" in target else "tar.gz" + asset_url = f"{base}/{bin_name}-{target}.{ext}" + sha_url = f"{base}/{bin_name}-{target}.sha256" + try: + binaries[target] = { + "url": asset_url, + "sha256": read_checksum(sha_url), + } + except Exception as exc: + print(f"::warning::missing checksum for {target}: {exc}", file=sys.stderr) + if not binaries: + raise RuntimeError("no binary artefacts could be resolved") + return binaries + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--repo-url", required=True) + parser.add_argument("--tag", required=True) + parser.add_argument("--bin", required=True) + parser.add_argument("--out", default="binaries.json") + args = parser.parse_args() + + binaries = build_binary_artifact_map( + repo_url=args.repo_url, + tag=args.tag, + bin_name=args.bin, + targets=DEFAULT_TARGETS, + read_checksum=read_checksum_url, + ) + pathlib.Path(args.out).write_text(json.dumps(binaries, indent=2) + "\n", encoding="utf-8") + print(json.dumps(binaries, indent=2)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/.github/workflows/_publish-registry.yml b/.github/workflows/_publish-registry.yml index 40979183..20c4dc81 100644 --- a/.github/workflows/_publish-registry.yml +++ b/.github/workflows/_publish-registry.yml @@ -52,118 +52,169 @@ jobs: - name: Install pyyaml run: pip install --quiet pyyaml - - name: Build payload - id: payload + - name: Install iii CLI + env: + VERSION: '0.11.5' + run: | + set -euo pipefail + curl -fsSL https://install.iii.dev/iii/main/install.sh -o /tmp/install-iii.sh + sh /tmp/install-iii.sh + { + echo "$HOME/.local/bin" + echo "$HOME/.iii/bin" + } >> "$GITHUB_PATH" + export PATH="$HOME/.local/bin:$HOME/.iii/bin:$PATH" + iii --version + + - name: Start III engine + run: | + set -euo pipefail + printf 'workers: []\n' > config.yaml + iii --config config.yaml --no-update-check > iii-engine.log 2>&1 & + echo "$!" > iii-engine.pid + + for _ in {1..60}; do + if ! kill -0 "$(cat iii-engine.pid)" 2>/dev/null; then + echo "::error::iii engine exited before becoming ready" + cat iii-engine.log + exit 1 + fi + + if iii trigger --function-id='engine::workers::list' --payload='{}' >/tmp/iii-workers.json 2>/tmp/iii-trigger.err; then + cat /tmp/iii-workers.json + exit 0 + fi + + sleep 1 + done + + echo "::error::iii engine did not become ready" + cat /tmp/iii-trigger.err || true + cat iii-engine.log || true + exit 1 + + - name: Start local worker for interface collection env: WORKER: ${{ inputs.worker }} - VERSION: ${{ inputs.version }} - DEPLOY: ${{ inputs.deploy }} - REGISTRY_TAG: ${{ inputs.registry_tag }} - TAG: ${{ inputs.tag }} BIN: ${{ inputs.bin }} - IMAGE_TAG: ${{ inputs.image_tag }} - REPO_URL: ${{ format('https://github.com/{0}', github.repository) }} run: | - python3 - <<'PY' - import json, os, pathlib, sys, urllib.request, yaml + set -euo pipefail + mode=$(python3 - <<'PY' + import os + import yaml worker = os.environ["WORKER"] - version = os.environ["VERSION"] - deploy = os.environ["DEPLOY"] - reg_tag = os.environ["REGISTRY_TAG"] or "latest" - tag = os.environ["TAG"] - repo_url = os.environ["REPO_URL"] - bin_name = os.environ.get("BIN") or worker - image_tag = os.environ.get("IMAGE_TAG", "") - - root = pathlib.Path(worker) - meta = yaml.safe_load((root / "iii.worker.yaml").read_text()) or {} - - # Normalize `dependencies` into the array-of-objects wire shape the - # registry's /publish endpoint expects. Authors may write either: - # dependencies: - # math-worker: "^0.1.0" # map form (recommended) - # or: - # dependencies: - # - name: math-worker - # version: "^0.1.0" # explicit wire form - raw_deps = meta.get("dependencies") or [] - if isinstance(raw_deps, dict): - deps = [{"name": k, "version": v} for k, v in raw_deps.items()] - elif isinstance(raw_deps, list): - deps = raw_deps - else: - print( - f"::error::`dependencies` must be a map or list, got {type(raw_deps).__name__}" - ) - sys.exit(1) - - readme_path = root / "README.md" - readme = readme_path.read_text() if readme_path.exists() else "" - - config_path = root / "config.yaml" - config = {} - if config_path.exists(): - config = yaml.safe_load(config_path.read_text()) or {} - - payload = { - "worker_name": worker, - "version": version, - "tag": reg_tag, - "type": deploy, - "readme": readme, - "repo": repo_url, - "description": meta.get("description", ""), - "dependencies": deps, - "config": config, - } - - if deploy == "binary": - targets = [ - "x86_64-apple-darwin", - "aarch64-apple-darwin", - "x86_64-pc-windows-msvc", - "i686-pc-windows-msvc", - "aarch64-pc-windows-msvc", - "x86_64-unknown-linux-gnu", - "x86_64-unknown-linux-musl", - "aarch64-unknown-linux-gnu", - "armv7-unknown-linux-gnueabihf", - ] - base = f"{repo_url}/releases/download/{tag}" - binaries = {} - for t in targets: - ext = "zip" if "windows" in t else "tar.gz" - asset = f"{bin_name}-{t}.{ext}" - asset_url = f"{base}/{asset}" - # taiki-e/upload-rust-binary-action publishes the checksum - # as `-.sha256` (without the archive extension). - sha_url = f"{base}/{bin_name}-{t}.sha256" - try: - with urllib.request.urlopen(sha_url, timeout=20) as r: - sha_text = r.read().decode("utf-8").strip() - sha = sha_text.split()[0] - binaries[t] = {"url": asset_url, "sha256": sha} - except Exception as e: - print(f"::warning::missing checksum for {t}: {e}") - if not binaries: - print("::error::no binary artefacts could be resolved") - sys.exit(1) - payload["binaries"] = binaries - elif deploy == "image": - if not image_tag: - print("::error::deploy=image but no image_tag provided") - sys.exit(1) - payload["image_tag"] = image_tag + manifest_path = f"{worker}/iii.worker.yaml" + manifest = yaml.safe_load(open(manifest_path, encoding="utf-8").read()) or {} + scripts = manifest.get("scripts") or {} + runtime = manifest.get("runtime") or {} + has_scripts_start = bool(str(scripts.get("start") or "").strip()) + has_runtime = bool(runtime.get("kind") or runtime.get("language")) + language = str(manifest.get("language") or "").strip().lower() + + if has_scripts_start or has_runtime: + print("iii-add") + elif language == "rust": + print("cargo-run") else: - print(f"::error::unsupported deploy={deploy}") - sys.exit(1) + print("unsupported") + PY + ) + + case "$mode" in + iii-add) + iii worker add "./$WORKER" --force --reset-config --wait + ;; + cargo-run) + if [[ ! -f "$WORKER/Cargo.toml" ]]; then + echo "::error::$WORKER is a Rust worker but $WORKER/Cargo.toml was not found" + exit 1 + fi + + worker_log="worker-$WORKER.log" + pushd "$WORKER" >/dev/null + cargo_args=(run) + if [[ -n "${BIN:-}" ]]; then + cargo_args+=(--bin "$BIN") + fi + cargo_args+=(--) + if [[ "$WORKER" == "mcp" ]]; then + cargo_args+=(--no-stdio) + fi - out = pathlib.Path("payload.json") - out.write_text(json.dumps(payload, indent=2)) - print(json.dumps({k: payload[k] for k in payload if k != "readme"}, indent=2)) + echo "Starting local worker with: (cd $WORKER && ${cargo_args[*]})" + "${cargo_args[@]}" > "../$worker_log" 2>&1 & + echo "$!" > ../worker.pid + popd >/dev/null + + sleep 2 + if ! kill -0 "$(cat worker.pid)" 2>/dev/null; then + echo "::error::$WORKER exited before interface collection" + cat "$worker_log" || true + exit 1 + fi + ;; + *) + echo "::error::$WORKER iii.worker.yaml has no local runtime/scripts.start and is not a Rust worker; cannot collect interface" + exit 1 + ;; + esac + + - name: Collect worker interface + env: + WORKER: ${{ inputs.worker }} + run: | + args=("--worker" "$WORKER" "--out" "worker-interface.json" "--wait-seconds" "120") + python3 .github/scripts/collect_worker_interface.py "${args[@]}" + + - name: Assert worker interface was collected + run: | + python3 - <<'PY' + import json + data = json.load(open("worker-interface.json")) + if not data.get("functions"): + raise SystemExit("no worker functions were collected") PY + - name: Resolve binary artifacts + if: inputs.deploy == 'binary' + env: + TAG: ${{ inputs.tag }} + BIN: ${{ inputs.bin }} + WORKER: ${{ inputs.worker }} + REPO_URL: ${{ format('https://github.com/{0}', github.repository) }} + run: | + python3 .github/scripts/resolve_binary_artifacts.py \ + --repo-url "$REPO_URL" \ + --tag "$TAG" \ + --bin "${BIN:-$WORKER}" \ + --out binaries.json + + - name: Create empty binary artifacts + if: inputs.deploy != 'binary' + run: | + printf '{}\n' > binaries.json + + - name: Build payload + env: + WORKER: ${{ inputs.worker }} + VERSION: ${{ inputs.version }} + DEPLOY: ${{ inputs.deploy }} + REGISTRY_TAG: ${{ inputs.registry_tag }} + IMAGE_TAG: ${{ inputs.image_tag }} + REPO_URL: ${{ format('https://github.com/{0}', github.repository) }} + run: | + python3 .github/scripts/build_publish_payload.py \ + --worker "$WORKER" \ + --version "$VERSION" \ + --registry-tag "$REGISTRY_TAG" \ + --deploy "$DEPLOY" \ + --repo-url "$REPO_URL" \ + --interface-json worker-interface.json \ + --binaries-json binaries.json \ + --image-tag "$IMAGE_TAG" \ + --out payload.json + - name: POST /publish env: API_URL: ${{ inputs.api_url }} @@ -184,3 +235,29 @@ jobs: echo "::error::publish failed with HTTP $http" exit 1 fi + + - name: Dump III logs + if: failure() + run: | + echo "::group::iii engine log" + tail -n 200 iii-engine.log || true + echo "::endgroup::" + if [[ -f worker-${{ inputs.worker }}.log ]]; then + echo "::group::worker log" + tail -n 200 worker-${{ inputs.worker }}.log || true + echo "::endgroup::" + fi + + - name: Stop local worker + if: always() + run: | + if [[ -f worker.pid ]]; then + kill "$(cat worker.pid)" 2>/dev/null || true + fi + + - name: Stop III engine + if: always() + run: | + if [[ -f iii-engine.pid ]]; then + kill "$(cat iii-engine.pid)" 2>/dev/null || true + fi