diff --git a/app/utils/docker_network.py b/app/utils/docker_network.py new file mode 100644 index 0000000..87b5a3d --- /dev/null +++ b/app/utils/docker_network.py @@ -0,0 +1,154 @@ +import logging +import aiodocker + +logger = logging.getLogger(__name__) + + +async def get_service_container_id( + docker_client: aiodocker.Docker, service_name: str +) -> str | None: + try: + containers = await docker_client.containers.list(all=True) + except Exception: + return None + + fallback_ids = [] + for container in containers: + if isinstance(container, dict): + labels = container.get("Labels") or {} + container_id = container.get("Id") + names = container.get("Names") or [] + else: + labels = getattr(container, "labels", {}) or {} + container_id = getattr(container, "id", None) + names = getattr(container, "names", []) or [] + + if labels.get("com.docker.compose.service") == service_name: + return container_id + if any(service_name in name for name in names): + return container_id + if container_id: + fallback_ids.append(container_id) + + for container_id in fallback_ids: + try: + container = await docker_client.containers.get(container_id) + info = await container.show() + except Exception: + continue + + labels = info.get("Config", {}).get("Labels", {}) or {} + name = (info.get("Name") or "").lstrip("/") + if labels.get("com.docker.compose.service") == service_name: + return container_id + if service_name in name: + return container_id + + return None + + +async def ensure_network( + docker_client: aiodocker.Docker, name: str, labels: dict[str, str] +) -> None: + try: + await docker_client.networks.get(name) + return + except aiodocker.DockerError as error: + if error.status != 404: + raise + + await docker_client.networks.create( + {"Name": name, "CheckDuplicate": True, "Labels": labels} + ) + + +async def connect_container_to_network( + docker_client: aiodocker.Docker, container_id: str | None, network_name: str | None +) -> None: + if not container_id or not network_name: + return + + try: + network = await docker_client.networks.get(network_name) + except aiodocker.DockerError as error: + if error.status != 404: + raise + return + + try: + await network.connect({"Container": container_id}) + except aiodocker.DockerError as error: + if error.status == 403 and "endpoint with name" in str(error).lower(): + return + if error.status != 409: + raise + + +async def disconnect_container_from_network( + docker_client: aiodocker.Docker, container_id: str | None, network_name: str | None +) -> None: + if not container_id or not network_name: + return + + try: + network = await docker_client.networks.get(network_name) + except aiodocker.DockerError as error: + if error.status != 404: + raise + return + + try: + await network.disconnect({"Container": container_id, "Force": True}) + except aiodocker.DockerError as error: + if error.status not in (404, 409): + logger.warning( + "Failed to detach %s from %s: %s", container_id, network_name, error + ) + + +async def network_has_deployments( + docker_client: aiodocker.Docker, network_name: str +) -> bool: + try: + network = await docker_client.networks.get(network_name) + except aiodocker.DockerError as error: + if error.status != 404: + raise + return False + + info = await network.show() + containers = info.get("Containers") or {} + for container_id in containers.keys(): + try: + container = await docker_client.containers.get(container_id) + container_info = await container.show() + except Exception: + continue + + labels = container_info.get("Config", {}).get("Labels", {}) or {} + if labels.get("devpush.deployment_id"): + return True + + return False + + +async def remove_network_if_empty( + docker_client: aiodocker.Docker, network_name: str +) -> bool: + try: + network = await docker_client.networks.get(network_name) + except aiodocker.DockerError as error: + if error.status != 404: + raise + return False + + info = await network.show() + containers = info.get("Containers") or {} + if containers: + return False + + try: + await network.delete() + return True + except aiodocker.DockerError: + return False diff --git a/app/workers/arq.py b/app/workers/arq.py index 8941c7a..c2b9d36 100644 --- a/app/workers/arq.py +++ b/app/workers/arq.py @@ -2,6 +2,7 @@ from arq.connections import RedisSettings from workers.tasks.deployment import ( start_deployment, + reconcile_edge_network, finalize_deployment, fail_deployment, cleanup_inactive_containers, @@ -18,9 +19,14 @@ settings = get_settings() +async def startup(ctx): + await reconcile_edge_network(ctx) + + class WorkerSettings: functions = [ start_deployment, + reconcile_edge_network, finalize_deployment, fail_deployment, delete_user, @@ -37,3 +43,4 @@ class WorkerSettings: max_tries = settings.job_max_tries health_check_interval = 65 # Greater than 60s to avoid health check timeout allow_abort_jobs = True + on_startup = startup diff --git a/app/workers/monitor.py b/app/workers/monitor.py index dbd33d0..ba8d41c 100644 --- a/app/workers/monitor.py +++ b/app/workers/monitor.py @@ -9,10 +9,18 @@ from db import AsyncSessionLocal from models import Deployment +from utils.docker_network import ( + connect_container_to_network, + disconnect_container_from_network, + get_service_container_id, + network_has_deployments, + remove_network_if_empty, +) logger = logging.getLogger(__name__) deployment_probe_state = {} # deployment_id -> {"container": container_obj, "probe_active": bool} +WORKSPACE_NETWORK_PREFIX = "devpush_workspace_" async def _http_probe(ip: str, port: int, timeout: float = 5) -> bool: @@ -25,17 +33,51 @@ async def _http_probe(ip: str, port: int, timeout: float = 5) -> bool: return False +async def _ensure_probe_on_network( + docker_client: aiodocker.Docker, probe_id: str | None, network_name: str | None +): + await connect_container_to_network(docker_client, probe_id, network_name) + + +async def _detach_probe_from_unused_networks( + docker_client: aiodocker.Docker, + probe_id: str | None, +): + if not probe_id: + return + + try: + container = await docker_client.containers.get(probe_id) + info = await container.show() + except Exception: + return + + networks = info.get("NetworkSettings", {}).get("Networks", {}) or {} + for network_name in networks.keys(): + if not network_name.startswith(WORKSPACE_NETWORK_PREFIX): + continue + if await network_has_deployments(docker_client, network_name): + continue + + await disconnect_container_from_network( + docker_client, probe_id, network_name + ) + if not await network_has_deployments(docker_client, network_name): + await remove_network_if_empty(docker_client, network_name) + + async def _check_status( deployment: Deployment, docker_client: aiodocker.Docker, redis_pool: ArqRedis, + probe_id: str | None, ): """Checks the status of a single deployment's container.""" if ( deployment.id in deployment_probe_state and deployment_probe_state[deployment.id]["probe_active"] ): - return + return None log_prefix = f"[DeployMonitor:{deployment.id}]" @@ -54,7 +96,7 @@ async def _check_status( ) logger.warning(f"{log_prefix} Deployment timed out; failure job enqueued.") await _cleanup_deployment(deployment.id) - return + return None except Exception: logger.error(f"{log_prefix} Error while evaluating timeout.", exc_info=True) @@ -91,13 +133,24 @@ async def _check_status( elif status == "running": networks = container_info.get("NetworkSettings", {}).get("Networks", {}) - container_ip = networks.get("devpush_runner", {}).get("IPAddress") + labels = container_info.get("Config", {}).get("Labels", {}) or {} + workspace_network = labels.get("devpush.workspace_network") + container_ip = None + if workspace_network: + await _ensure_probe_on_network( + docker_client, probe_id, workspace_network + ) + container_ip = networks.get(workspace_network, {}).get("IPAddress") + if not container_ip: + # LEGACY(network): fallback for deployments created before edge/workspace networks. + container_ip = networks.get("devpush_runner", {}).get("IPAddress") if container_ip and await _http_probe(container_ip, 8000): await redis_pool.enqueue_job("finalize_deployment", deployment.id) logger.info( f"{log_prefix} Deployment ready (finalization job enqueued)." ) await _cleanup_deployment(deployment.id) + return workspace_network except Exception as e: logger.error( @@ -105,9 +158,11 @@ async def _check_status( ) await redis_pool.enqueue_job("fail_deployment", deployment.id, str(e)) await _cleanup_deployment(deployment.id) + return None finally: if deployment.id in deployment_probe_state: deployment_probe_state[deployment.id]["probe_active"] = False + return None # Cleanup function @@ -152,11 +207,26 @@ async def monitor(): deployments_to_check = result.scalars().all() if deployments_to_check: + probe_id = await get_service_container_id( + docker_client, "worker-monitor" + ) tasks = [ - _check_status(deployment, docker_client, redis_pool) + _check_status( + deployment, docker_client, redis_pool, probe_id + ) for deployment in deployments_to_check ] await asyncio.gather(*tasks) + await _detach_probe_from_unused_networks( + docker_client, probe_id + ) + else: + probe_id = await get_service_container_id( + docker_client, "worker-monitor" + ) + await _detach_probe_from_unused_networks( + docker_client, probe_id + ) except exc.SQLAlchemyError as e: logger.error(f"Database error in monitor loop: {e}. Reconnecting.") diff --git a/app/workers/tasks/deployment.py b/app/workers/tasks/deployment.py index c22709c..097c2fd 100644 --- a/app/workers/tasks/deployment.py +++ b/app/workers/tasks/deployment.py @@ -9,6 +9,13 @@ from models import Alias, Deployment, Project from db import AsyncSessionLocal +from utils.docker_network import ( + connect_container_to_network, + disconnect_container_from_network, + ensure_network, + get_service_container_id, + remove_network_if_empty, +) from dependencies import ( get_redis_client, get_github_installation_service, @@ -19,6 +26,56 @@ logger = logging.getLogger(__name__) +EDGE_NETWORK_PREFIX = "devpush_edge_" +WORKSPACE_NETWORK_PREFIX = "devpush_workspace_" + + +def _edge_network_name(deployment_id: str) -> str: + return f"{EDGE_NETWORK_PREFIX}{deployment_id}" + + +def _workspace_network_name(team_id: str) -> str: + return f"{WORKSPACE_NETWORK_PREFIX}{team_id}" + + +def _get_network_names_from_labels(container_info: dict[str, Any]): + labels = container_info.get("Config", {}).get("Labels", {}) or {} + return labels.get("devpush.edge_network"), labels.get("devpush.workspace_network") + + +async def _ensure_traefik_on_network( + docker_client: aiodocker.Docker, network_name: str, log_prefix: str +): + traefik_id = await get_service_container_id(docker_client, "traefik") + if not traefik_id: + logger.warning(f"{log_prefix} Traefik container not found for network attach.") + return + + await connect_container_to_network(docker_client, traefik_id, network_name) + + +async def _disconnect_traefik_from_network( + docker_client: aiodocker.Docker, network_name: str, log_prefix: str +): + traefik_id = await get_service_container_id(docker_client, "traefik") + if not traefik_id: + logger.warning(f"{log_prefix} Traefik container not found for network detach.") + return + + await disconnect_container_from_network(docker_client, traefik_id, network_name) + +async def _cleanup_networks( + docker_client: aiodocker.Docker, + edge_network_name: str | None, + log_prefix: str, +): + if edge_network_name: + await _disconnect_traefik_from_network( + docker_client, edge_network_name, log_prefix + ) + if await remove_network_if_empty(docker_client, edge_network_name): + logger.info(f"{log_prefix} Removed network {edge_network_name}") + async def _log_to_container(container, message, error=False): """Logs a message to the container.""" @@ -149,18 +206,40 @@ async def start_deployment(ctx, deployment_id: str): # Setup container configuration container_name = f"runner-{deployment.id[:7]}" router = f"deployment-{deployment.id}" + edge_network_name = _edge_network_name(deployment.id) + workspace_network_name = _workspace_network_name( + deployment.project.team_id + ) + await ensure_network( + docker_client, + edge_network_name, + { + "devpush.network_role": "edge", + "devpush.deployment_id": deployment.id, + }, + ) + await ensure_network( + docker_client, + workspace_network_name, + { + "devpush.network_role": "workspace", + "devpush.workspace_id": deployment.project.team_id, + }, + ) labels = { "traefik.enable": "true", f"traefik.http.routers.{router}.rule": f"Host(`{deployment.slug}.{settings.deploy_domain}`)", f"traefik.http.routers.{router}.service": f"{router}@docker", f"traefik.http.routers.{router}.priority": "10", f"traefik.http.services.{router}.loadbalancer.server.port": "8000", - "traefik.docker.network": "devpush_runner", + "traefik.docker.network": edge_network_name, "devpush.deployment_id": deployment.id, "devpush.project_id": deployment.project_id, "devpush.environment_id": deployment.environment_id, "devpush.branch": deployment.branch, + "devpush.edge_network": edge_network_name, + "devpush.workspace_network": workspace_network_name, } if settings.url_scheme == "https": @@ -216,7 +295,12 @@ async def start_deployment(ctx, deployment_id: str): "Env": [f"{k}={v}" for k, v in env_vars_dict.items()], "WorkingDir": "/app", "Labels": labels, - "NetworkingConfig": {"EndpointsConfig": {"devpush_runner": {}}}, + "NetworkingConfig": { + "EndpointsConfig": { + edge_network_name: {}, + workspace_network_name: {}, + } + }, "HostConfig": { **( {"CpuQuota": int(cpus * 100000), "CpuPeriod": 100000} @@ -247,6 +331,10 @@ async def start_deployment(ctx, deployment_id: str): logger.info( f"{log_prefix} Container {container.id} started. Monitoring..." ) + job_queue: ArqRedis = ctx["redis"] + await job_queue.enqueue_job( + "reconcile_edge_network", deployment.id + ) except asyncio.CancelledError: # TODO: check if it works and refactor @@ -264,6 +352,18 @@ async def start_deployment(ctx, deployment_id: str): except Exception as e: logger.error(f"{log_prefix} Error cleaning up container: {e}") + try: + edge_network_name = _edge_network_name(deployment_id) + async with aiodocker.Docker( + url=settings.docker_host + ) as docker_client: + await _cleanup_networks(docker_client, edge_network_name, log_prefix) + except Exception: + logger.warning( + f"{log_prefix} Failed to cleanup networks after cancel.", + exc_info=True, + ) + try: async with AsyncSessionLocal() as db: deployment = await db.get(Deployment, deployment_id) @@ -280,6 +380,75 @@ async def start_deployment(ctx, deployment_id: str): logger.info(f"{log_prefix} Deployment startup failed.", exc_info=True) +async def reconcile_edge_network(ctx, deployment_id: str | None = None): + """Ensures Traefik is attached to edge networks for active deployments.""" + settings = get_settings() + log_prefix = "[EdgeReconcile]" + + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + edge_networks: set[str] = set() + + if deployment_id: + async with AsyncSessionLocal() as db: + deployment = ( + await db.execute( + select(Deployment).where(Deployment.id == deployment_id) + ) + ).scalar_one_or_none() + if not deployment or not deployment.container_id: + deployment_id = None + else: + try: + container_obj = await docker_client.containers.get( + deployment.container_id + ) + info = await container_obj.show() + labels = info.get("Config", {}).get("Labels", {}) or {} + edge_network = labels.get("devpush.edge_network") + if edge_network: + edge_networks.add(edge_network) + else: + deployment_id = None + except Exception: + deployment_id = None + + if not edge_networks and not deployment_id: + try: + containers = await docker_client.containers.list(all=True) + except Exception: + logger.warning(f"{log_prefix} Failed to list containers.") + return + + for container in containers: + if isinstance(container, dict): + labels = container.get("Labels") or {} + container_id = container.get("Id") + else: + labels = getattr(container, "labels", {}) or {} + container_id = getattr(container, "id", None) + + edge_network = labels.get("devpush.edge_network") + if not edge_network and container_id: + try: + container_obj = await docker_client.containers.get(container_id) + info = await container_obj.show() + labels = info.get("Config", {}).get("Labels", {}) or {} + edge_network = labels.get("devpush.edge_network") + except Exception: + continue + + if edge_network: + edge_networks.add(edge_network) + + if not edge_networks: + return + + for edge_network in sorted(edge_networks): + await _ensure_traefik_on_network( + docker_client, edge_network, log_prefix + ) + + async def finalize_deployment(ctx, deployment_id: str): """Finalizes a deployment, setting up aliases and updating Traefik config.""" settings = get_settings() @@ -371,6 +540,9 @@ async def fail_deployment(ctx, deployment_id: str, reason: str = None): ) ).scalar_one() + edge_network_name = _edge_network_name(deployment.id) + networks_cleaned = False + if deployment.container_id and deployment.container_status not in ( "removed", "stopped", @@ -381,12 +553,24 @@ async def fail_deployment(ctx, deployment_id: str, reason: str = None): deployment.container_id ) try: + try: + container_info = await container.show() + edge_from_label, _ = _get_network_names_from_labels( + container_info + ) + edge_network_name = edge_from_label or edge_network_name + except Exception: + logger.warning( + f"{log_prefix} Failed to read container network labels." + ) await container.stop() except Exception: pass # Grace period to allow logs to be ingested await asyncio.sleep(settings.container_delete_grace_seconds) await container.delete(force=True) + await _cleanup_networks(docker_client, edge_network_name, log_prefix) + networks_cleaned = True deployment.container_status = "removed" logger.info( f"{log_prefix} Cleaned up failed container {deployment.container_id}" @@ -398,6 +582,8 @@ async def fail_deployment(ctx, deployment_id: str, reason: str = None): f"{log_prefix} Container {deployment.container_id} not found, already removed" ) deployment.container_status = "removed" + await _cleanup_networks(docker_client, edge_network_name, log_prefix) + networks_cleaned = True else: logger.error( f"{log_prefix} Docker error cleaning up container {deployment.container_id}: {error}", @@ -409,6 +595,16 @@ async def fail_deployment(ctx, deployment_id: str, reason: str = None): exc_info=True, ) + if not networks_cleaned: + try: + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + await _cleanup_networks(docker_client, edge_network_name, log_prefix) + except Exception: + logger.warning( + f"{log_prefix} Failed to cleanup networks after failure.", + exc_info=True, + ) + deployment.status = "completed" deployment.conclusion = "failed" deployment.project.updated_at = datetime.now(timezone.utc).replace(tzinfo=None) @@ -517,6 +713,16 @@ async def cleanup_inactive_containers( container = await docker_client.containers.get( deployment.container_id ) + edge_network_name = None + try: + container_info = await container.show() + edge_network_name, _ = _get_network_names_from_labels( + container_info + ) + except Exception: + logger.warning( + f"[CleanupInactiveContainers:{project_id}] Failed to read container network labels." + ) # Stop container await container.stop() @@ -534,6 +740,11 @@ async def cleanup_inactive_containers( logger.info( f"[CleanupInactiveContainers:{project_id}] Removed container {deployment.container_id}" ) + await _cleanup_networks( + docker_client, + edge_network_name, + f"[CleanupInactiveContainers:{project_id}]", + ) except aiodocker.DockerError as error: if error.status == 404: diff --git a/scripts/network-reconcile.sh b/scripts/network-reconcile.sh new file mode 100755 index 0000000..afa2864 --- /dev/null +++ b/scripts/network-reconcile.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +set -Eeuo pipefail +IFS=$'\n\t' + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "$SCRIPT_DIR/lib.sh" + +init_script_logging "network-reconcile" + +usage(){ + cat <] [-h|--help] + +Enqueue a network reconciliation task for edge networks. + + --deployment-id Target a specific deployment (default: all) + -h, --help Show this help +USG + exit 0 +} + +# Parse CLI flags +deployment_id="" +while [[ $# -gt 0 ]]; do + case "$1" in + --deployment-id) + [[ $# -gt 1 ]] || { err "Missing value for --deployment-id"; usage; } + deployment_id="$2" + shift 2 + ;; + -h|--help) usage ;; + *) err "Unknown option: $1"; usage ;; + esac +done + +cd "$APP_DIR" || { err "App dir not found: $APP_DIR"; exit 1; } + +docker info >/dev/null 2>&1 || { err "Docker not accessible. Run with sudo or add your user to the docker group."; exit 1; } + +# Enqueue reconcile job +printf '\n' +set_compose_base +deploy_arg="" +if [[ -n "$deployment_id" ]]; then + deploy_arg=", \"$deployment_id\"" +fi +PY_CMD="import asyncio +from arq.connections import RedisSettings, create_pool +from config import get_settings + +async def main(): + settings = get_settings() + redis_settings = RedisSettings.from_dsn(settings.redis_url) + redis = await create_pool(redis_settings) + await redis.enqueue_job(\"reconcile_edge_network\"${deploy_arg}) + await redis.aclose() + +asyncio.run(main())" + +run_cmd "Enqueueing edge network reconcile..." \ + "${COMPOSE_BASE[@]}" exec -T app uv run python -c "$PY_CMD" + +# Success +printf '\n' +printf "${GRN}Network reconcile queued. ✔${NC}\n"