Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions app/utils/docker_network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import logging
import aiodocker

logger = logging.getLogger(__name__)


async def get_service_container_id(
docker_client: aiodocker.Docker, service_name: str
) -> str | None:
try:
containers = await docker_client.containers.list(all=True)
except Exception:
return None

fallback_ids = []
for container in containers:
if isinstance(container, dict):
labels = container.get("Labels") or {}
container_id = container.get("Id")
names = container.get("Names") or []
else:
labels = getattr(container, "labels", {}) or {}
container_id = getattr(container, "id", None)
names = getattr(container, "names", []) or []

if labels.get("com.docker.compose.service") == service_name:
return container_id
if any(service_name in name for name in names):
return container_id
if container_id:
fallback_ids.append(container_id)

for container_id in fallback_ids:
try:
container = await docker_client.containers.get(container_id)
info = await container.show()
except Exception:
continue

labels = info.get("Config", {}).get("Labels", {}) or {}
name = (info.get("Name") or "").lstrip("/")
if labels.get("com.docker.compose.service") == service_name:
return container_id
if service_name in name:
return container_id

return None


async def ensure_network(
docker_client: aiodocker.Docker, name: str, labels: dict[str, str]
) -> None:
try:
await docker_client.networks.get(name)
return
except aiodocker.DockerError as error:
if error.status != 404:
raise

await docker_client.networks.create(
{"Name": name, "CheckDuplicate": True, "Labels": labels}
)


async def connect_container_to_network(
docker_client: aiodocker.Docker, container_id: str | None, network_name: str | None
) -> None:
if not container_id or not network_name:
return

try:
network = await docker_client.networks.get(network_name)
except aiodocker.DockerError as error:
if error.status != 404:
raise
return

try:
await network.connect({"Container": container_id})
except aiodocker.DockerError as error:
if error.status == 403 and "endpoint with name" in str(error).lower():
return
if error.status != 409:
raise


async def disconnect_container_from_network(
docker_client: aiodocker.Docker, container_id: str | None, network_name: str | None
) -> None:
if not container_id or not network_name:
return

try:
network = await docker_client.networks.get(network_name)
except aiodocker.DockerError as error:
if error.status != 404:
raise
return

try:
await network.disconnect({"Container": container_id, "Force": True})
except aiodocker.DockerError as error:
if error.status not in (404, 409):
logger.warning(
"Failed to detach %s from %s: %s", container_id, network_name, error
)


async def network_has_deployments(
docker_client: aiodocker.Docker, network_name: str
) -> bool:
try:
network = await docker_client.networks.get(network_name)
except aiodocker.DockerError as error:
if error.status != 404:
raise
return False

info = await network.show()
containers = info.get("Containers") or {}
for container_id in containers.keys():
try:
container = await docker_client.containers.get(container_id)
container_info = await container.show()
except Exception:
continue

labels = container_info.get("Config", {}).get("Labels", {}) or {}
if labels.get("devpush.deployment_id"):
return True

return False


async def remove_network_if_empty(
docker_client: aiodocker.Docker, network_name: str
) -> bool:
try:
network = await docker_client.networks.get(network_name)
except aiodocker.DockerError as error:
if error.status != 404:
raise
return False

info = await network.show()
containers = info.get("Containers") or {}
if containers:
return False

try:
await network.delete()
return True
except aiodocker.DockerError:
return False
7 changes: 7 additions & 0 deletions app/workers/arq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from arq.connections import RedisSettings
from workers.tasks.deployment import (
start_deployment,
reconcile_edge_network,
finalize_deployment,
fail_deployment,
cleanup_inactive_containers,
Expand All @@ -18,9 +19,14 @@
settings = get_settings()


async def startup(ctx):
await reconcile_edge_network(ctx)


class WorkerSettings:
functions = [
start_deployment,
reconcile_edge_network,
finalize_deployment,
fail_deployment,
delete_user,
Expand All @@ -37,3 +43,4 @@ class WorkerSettings:
max_tries = settings.job_max_tries
health_check_interval = 65 # Greater than 60s to avoid health check timeout
allow_abort_jobs = True
on_startup = startup
78 changes: 74 additions & 4 deletions app/workers/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@

from db import AsyncSessionLocal
from models import Deployment
from utils.docker_network import (
connect_container_to_network,
disconnect_container_from_network,
get_service_container_id,
network_has_deployments,
remove_network_if_empty,
)

logger = logging.getLogger(__name__)

deployment_probe_state = {} # deployment_id -> {"container": container_obj, "probe_active": bool}
WORKSPACE_NETWORK_PREFIX = "devpush_workspace_"


async def _http_probe(ip: str, port: int, timeout: float = 5) -> bool:
Expand All @@ -25,17 +33,51 @@ async def _http_probe(ip: str, port: int, timeout: float = 5) -> bool:
return False


async def _ensure_probe_on_network(
docker_client: aiodocker.Docker, probe_id: str | None, network_name: str | None
):
await connect_container_to_network(docker_client, probe_id, network_name)


async def _detach_probe_from_unused_networks(
docker_client: aiodocker.Docker,
probe_id: str | None,
):
if not probe_id:
return

try:
container = await docker_client.containers.get(probe_id)
info = await container.show()
except Exception:
return

networks = info.get("NetworkSettings", {}).get("Networks", {}) or {}
for network_name in networks.keys():
if not network_name.startswith(WORKSPACE_NETWORK_PREFIX):
continue
if await network_has_deployments(docker_client, network_name):
continue

await disconnect_container_from_network(
docker_client, probe_id, network_name
)
if not await network_has_deployments(docker_client, network_name):
await remove_network_if_empty(docker_client, network_name)


async def _check_status(
deployment: Deployment,
docker_client: aiodocker.Docker,
redis_pool: ArqRedis,
probe_id: str | None,
):
"""Checks the status of a single deployment's container."""
if (
deployment.id in deployment_probe_state
and deployment_probe_state[deployment.id]["probe_active"]
):
return
return None

log_prefix = f"[DeployMonitor:{deployment.id}]"

Expand All @@ -54,7 +96,7 @@ async def _check_status(
)
logger.warning(f"{log_prefix} Deployment timed out; failure job enqueued.")
await _cleanup_deployment(deployment.id)
return
return None
except Exception:
logger.error(f"{log_prefix} Error while evaluating timeout.", exc_info=True)

Expand Down Expand Up @@ -91,23 +133,36 @@ async def _check_status(

elif status == "running":
networks = container_info.get("NetworkSettings", {}).get("Networks", {})
container_ip = networks.get("devpush_runner", {}).get("IPAddress")
labels = container_info.get("Config", {}).get("Labels", {}) or {}
workspace_network = labels.get("devpush.workspace_network")
container_ip = None
if workspace_network:
await _ensure_probe_on_network(
docker_client, probe_id, workspace_network
)
container_ip = networks.get(workspace_network, {}).get("IPAddress")
if not container_ip:
# LEGACY(network): fallback for deployments created before edge/workspace networks.
container_ip = networks.get("devpush_runner", {}).get("IPAddress")
if container_ip and await _http_probe(container_ip, 8000):
await redis_pool.enqueue_job("finalize_deployment", deployment.id)
logger.info(
f"{log_prefix} Deployment ready (finalization job enqueued)."
)
await _cleanup_deployment(deployment.id)
return workspace_network

except Exception as e:
logger.error(
f"{log_prefix} Unexpected error while checking status.", exc_info=True
)
await redis_pool.enqueue_job("fail_deployment", deployment.id, str(e))
await _cleanup_deployment(deployment.id)
return None
finally:
if deployment.id in deployment_probe_state:
deployment_probe_state[deployment.id]["probe_active"] = False
return None


# Cleanup function
Expand Down Expand Up @@ -152,11 +207,26 @@ async def monitor():
deployments_to_check = result.scalars().all()

if deployments_to_check:
probe_id = await get_service_container_id(
docker_client, "worker-monitor"
)
tasks = [
_check_status(deployment, docker_client, redis_pool)
_check_status(
deployment, docker_client, redis_pool, probe_id
)
for deployment in deployments_to_check
]
await asyncio.gather(*tasks)
await _detach_probe_from_unused_networks(
docker_client, probe_id
)
else:
probe_id = await get_service_container_id(
docker_client, "worker-monitor"
)
await _detach_probe_from_unused_networks(
docker_client, probe_id
)

except exc.SQLAlchemyError as e:
logger.error(f"Database error in monitor loop: {e}. Reconnecting.")
Expand Down
Loading