Skip to content
2 changes: 2 additions & 0 deletions opentakserver/blueprints/ots_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from opentakserver.blueprints.ots_api.group_api import group_api
from opentakserver.blueprints.ots_api.eud_stats_api import eud_stats_blueprint
from opentakserver.blueprints.ots_api.plugin_api import plugin_blueprint
from opentakserver.blueprints.ots_api.health_api import health_api

ots_api = Blueprint("ots_api", __name__)
ots_api.register_blueprint(api_blueprint)
Expand All @@ -33,3 +34,4 @@
ots_api.register_blueprint(eud_stats_blueprint)
ots_api.register_blueprint(plugin_blueprint)
ots_api.register_blueprint(token_api_blueprint)
ots_api.register_blueprint(health_api)
55 changes: 55 additions & 0 deletions opentakserver/blueprints/ots_api/health_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from flask import Blueprint, jsonify, request
from flask_security import auth_required

from opentakserver.health import cot_parser, eud_handler

# Blueprint for health endpoints
health_api = Blueprint("health_api", __name__)


@health_api.route("/api/health/ots")
@auth_required()
def health_ots():
"""Placeholder health check for OTS."""
return jsonify({"status": "ok"})


@health_api.route("/api/health/cot")
@auth_required()
def health_cot():
"""Health check for the CoT parser service."""
service_state = cot_parser.query_systemd()
log_lines = cot_parser.tail_ots_log_for_cot_parser_entries()
log_errors = cot_parser.find_errors(log_lines)
rabbit_ok = cot_parser.rabbitmq_check()

status = cot_parser.compute_status(service_state, log_errors, rabbit_ok)
status["timestamp"] = cot_parser.current_timestamp()

strict = request.args.get("strict", "false").lower() == "true"
code = 200
if strict and status["overall"] != "healthy":
code = 503

return jsonify(status), code


@health_api.route("/api/health/eud")
@auth_required()
def health_eud():
"""Health check for the EUD handler service."""
service_state = eud_handler.query_systemd()
log_lines = eud_handler.tail_ots_log_for_eud_handler_entries()
log_errors = eud_handler.find_errors(log_lines)
rabbit_ok = eud_handler.rabbitmq_check()

status = eud_handler.compute_status(service_state, log_errors, rabbit_ok)
status["timestamp"] = eud_handler.current_timestamp()

strict = request.args.get("strict", "false").lower() == "true"
code = 200
if strict and status["overall"] != "healthy":
code = 503

return jsonify(status), code

2 changes: 2 additions & 0 deletions opentakserver/health/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Health utilities for OpenTAKServer."""

122 changes: 122 additions & 0 deletions opentakserver/health/cot_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import re
import socket
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List

COT_PARSER_SERVICE = os.getenv("COT_PARSER_SERVICE", "cot_parser.service")
OTS_DATA_FOLDER = os.getenv("OTS_DATA_FOLDER", os.path.join(Path.home(), "ots"))
COT_PARSER_LOG = os.getenv(
"COT_PARSER_LOG",
os.path.join(OTS_DATA_FOLDER, "logs", "opentakserver.log"),
)
RABBIT_HOST = os.getenv("RABBIT_HOST", "localhost")
RABBIT_PORT = int(os.getenv("RABBIT_PORT", "5672"))
ERROR_PATTERN = os.getenv("COT_PARSER_ERROR_REGEX", r"(ERROR|Exception|Traceback)")
ERROR_REGEX = re.compile(ERROR_PATTERN, re.IGNORECASE)
LOG_TAG = "cot_parser"


def query_systemd(service: str = COT_PARSER_SERVICE) -> str:
"""
Returns one of: active, inactive, failed, activating, deactivating, reloading, unknown
"""
# First try: is-active (simplest, stable output)
try:
completed = subprocess.run(
["systemctl", "is-active", service],
check=False,
capture_output=True,
text=True,
)
state = completed.stdout.strip()
if state: # active/inactive/failed/...
return state
except Exception:
pass

# Fallback: show ActiveState
try:
completed = subprocess.run(
["systemctl", "show", service, "--property=ActiveState", "--value"],
check=True,
capture_output=True,
text=True,
)
return completed.stdout.strip()
except Exception as exc:
return f"error: {exc}"


def tail_ots_log_for_cot_parser_entries(
path: str = COT_PARSER_LOG, lines: int = 100, tag: str = LOG_TAG
) -> List[str]:
"""Return the last ``lines`` from the OTS log produced by ``cot_parser``."""
try:
with open(path, "rb") as fh:
fh.seek(0, os.SEEK_END)
size = fh.tell()
block = 1024
data = bytearray()
while size > 0 and data.count(b"\n") <= lines:
step = min(block, size)
size -= step
fh.seek(size)
data = fh.read(step) + data
log_lines = data.decode(errors="ignore").splitlines()
return [line for line in log_lines if tag in line][-lines:]
except OSError as exc: # pragma: no cover - exercised via tests
return [f"error: {exc}"]


def find_errors(lines: Iterable[str]) -> List[str]:
"""Filter log lines that match ``ERROR_REGEX``."""
return [line for line in lines if ERROR_REGEX.search(line)]


def rabbitmq_check(host: str = RABBIT_HOST, port: int = RABBIT_PORT, timeout: float = 1.0) -> bool:
"""Attempt a TCP connection to RabbitMQ and return whether it succeeded."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError: # pragma: no cover - exercised via tests
return False


def compute_status(service_state: str, log_errors: List[str], rabbitmq_ok: bool) -> dict:
"""Compute component and overall health status."""
components = {
"service": service_state,
"logs": "errors" if log_errors else "error-free",
"rabbitmq": "up" if rabbitmq_ok else "down",
}
problems: List[str] = []
if service_state != "active":
problems.append("cot_parser service inactive")
if log_errors:
problems.append("errors detected in log")
if not rabbitmq_ok:
problems.append("rabbitmq unreachable")

# Determine operational status
overall = "non-operational"
if rabbitmq_ok and service_state == "active":
overall = "operational-"
if log_errors:
overall += "errors"
else:
overall += "healthy"

return {
"status": overall,
"components": components,
"problems": problems,
}


def current_timestamp() -> str:
"""Return an ISO 8601 UTC timestamp."""
return datetime.now(timezone.utc).isoformat()

119 changes: 119 additions & 0 deletions opentakserver/health/eud_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
import re
import socket
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List

EUD_HANDLER_SERVICE = os.getenv("EUD_HANDLER_SERVICE", "eud_handler.service")
OTS_DATA_FOLDER = os.getenv("OTS_DATA_FOLDER", os.path.join(Path.home(), "ots"))
EUD_HANDLER_LOG = os.getenv(
"EUD_HANDLER_LOG",
os.path.join(OTS_DATA_FOLDER, "logs", "opentakserver.log"),
)
RABBIT_HOST = os.getenv("RABBIT_HOST", "localhost")
RABBIT_PORT = int(os.getenv("RABBIT_PORT", "5672"))
ERROR_PATTERN = os.getenv("EUD_HANDLER_ERROR_REGEX", r"(ERROR|Exception|Traceback)")
ERROR_REGEX = re.compile(ERROR_PATTERN, re.IGNORECASE)
LOG_TAG = "eud_handler"


def query_systemd(service: str = EUD_HANDLER_SERVICE) -> str:
"""Returns one of: active, inactive, failed, activating, deactivating, reloading, unknown"""
# First try: is-active (simplest, stable output)
try:
completed = subprocess.run(
["systemctl", "is-active", service],
check=False,
capture_output=True,
text=True,
)
state = completed.stdout.strip()
if state: # active/inactive/failed/...
return state
except Exception:
pass

# Fallback: show ActiveState
try:
completed = subprocess.run(
["systemctl", "show", service, "--property=ActiveState", "--value"],
check=True,
capture_output=True,
text=True,
)
return completed.stdout.strip()
except Exception as exc:
return f"error: {exc}"


def tail_ots_log_for_eud_handler_entries(
path: str = EUD_HANDLER_LOG, lines: int = 100, tag: str = LOG_TAG
) -> List[str]:
"""Return the last ``lines`` from the OTS log produced by ``eud_handler``."""
try:
with open(path, "rb") as fh:
fh.seek(0, os.SEEK_END)
size = fh.tell()
block = 1024
data = bytearray()
while size > 0 and data.count(b"\n") <= lines:
step = min(block, size)
size -= step
fh.seek(size)
data = fh.read(step) + data
log_lines = data.decode(errors="ignore").splitlines()
return [line for line in log_lines if tag in line][-lines:]
except OSError as exc: # pragma: no cover - exercised via tests
return [f"error: {exc}"]


def find_errors(lines: Iterable[str]) -> List[str]:
"""Filter log lines that match ``ERROR_REGEX``."""
return [line for line in lines if ERROR_REGEX.search(line)]


def rabbitmq_check(host: str = RABBIT_HOST, port: int = RABBIT_PORT, timeout: float = 1.0) -> bool:
"""Attempt a TCP connection to RabbitMQ and return whether it succeeded."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError: # pragma: no cover - exercised via tests
return False


def compute_status(service_state: str, log_errors: List[str], rabbitmq_ok: bool) -> dict:
"""Compute component and overall health status."""
components = {
"service": service_state,
"logs": "errors" if log_errors else "error-free",
"rabbitmq": "up" if rabbitmq_ok else "down",
}
problems: List[str] = []
if service_state != "active":
problems.append("eud_handler service inactive")
if log_errors:
problems.append("errors detected in log")
if not rabbitmq_ok:
problems.append("rabbitmq unreachable")

# Determine operational status
overall = "non-operational"
if rabbitmq_ok and service_state == "active":
overall = "operational-"
if log_errors:
overall += "errors"
else:
overall += "healthy"

return {
"status": overall,
"components": components,
"problems": problems,
}


def current_timestamp() -> str:
"""Return an ISO 8601 UTC timestamp."""
return datetime.now(timezone.utc).isoformat()
75 changes: 75 additions & 0 deletions tests/test_health_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from unittest.mock import patch


def test_health_endpoints(auth):
for endpoint in ("ots", "eud"):
response = auth.get(f"/api/health/{endpoint}")
assert response.status_code == 200


def test_health_cot_healthy(auth):
with patch("opentakserver.health.cot_parser.query_systemd", return_value="active"), \
patch(
"opentakserver.health.cot_parser.tail_ots_log_for_cot_parser_entries",
return_value=["all good"],
), \
patch("opentakserver.health.cot_parser.find_errors", return_value=[]), \
patch("opentakserver.health.cot_parser.rabbitmq_check", return_value=True):
response = auth.get("/api/health/cot")
assert response.status_code == 200
data = response.json
assert data["overall"] == "healthy"
assert data["problems"] == []
assert "timestamp" in data


def test_health_cot_unhealthy_strict(auth):
with patch("opentakserver.health.cot_parser.query_systemd", return_value="inactive"), \
patch(
"opentakserver.health.cot_parser.tail_ots_log_for_cot_parser_entries",
return_value=["error"],
), \
patch("opentakserver.health.cot_parser.find_errors", return_value=["error"]), \
patch("opentakserver.health.cot_parser.rabbitmq_check", return_value=False):
response = auth.get("/api/health/cot?strict=true")
assert response.status_code == 503
data = response.json
assert data["overall"] == "unhealthy"
assert data["problems"]


def test_health_eud_healthy(auth):
with patch("opentakserver.health.eud_handler.query_systemd", return_value="active"), \
patch(
"opentakserver.health.eud_handler.tail_ots_log_for_eud_handler_entries",
return_value=["all good"],
), \
patch("opentakserver.health.eud_handler.find_errors", return_value=[]), \
patch("opentakserver.health.eud_handler.rabbitmq_check", return_value=True):
response = auth.get("/api/health/eud")
assert response.status_code == 200
data = response.json
assert data["overall"] == "healthy"
assert data["problems"] == []
assert "timestamp" in data


def test_health_eud_unhealthy_strict(auth):
with patch("opentakserver.health.eud_handler.query_systemd", return_value="inactive"), \
patch(
"opentakserver.health.eud_handler.tail_ots_log_for_eud_handler_entries",
return_value=["error"],
), \
patch("opentakserver.health.eud_handler.find_errors", return_value=["error"]), \
patch("opentakserver.health.eud_handler.rabbitmq_check", return_value=False):
response = auth.get("/api/health/eud?strict=true")
assert response.status_code == 503
data = response.json
assert data["overall"] == "unhealthy"
assert data["problems"]


def test_health_requires_auth(client):
for endpoint in ("ots", "cot", "eud"):
response = client.get(f"/api/health/{endpoint}")
assert response.status_code in (401, 302)