Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 74 additions & 31 deletions app/api/src/endpoints/health_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <string_view>
#include <unistd.h>
#include <utility>
#include <vector>

namespace {

Expand Down Expand Up @@ -104,44 +105,92 @@ struct OsrmProbeResult {
}

[[nodiscard]] drogon::HttpClientPtr GetOsrmHttpClient() {
static drogon::HttpClientPtr client = drogon::HttpClient::newHttpClient(
deliveryoptimizer::api::ResolveNormalizedUrlEnvOrDefault("OSRM_URL", kDefaultOsrmUrl));
return client;
static const auto* client = new drogon::HttpClientPtr{drogon::HttpClient::newHttpClient(
deliveryoptimizer::api::ResolveNormalizedUrlEnvOrDefault("OSRM_URL", kDefaultOsrmUrl))};
return *client;
}

using OsrmProbeWaiter = std::function<void(const OsrmProbeResult&)>;

struct OsrmProbeCache {
std::mutex mutex;
std::optional<OsrmProbeResult> cached_result;
std::chrono::steady_clock::time_point cached_at;
bool probe_in_flight{false};
std::vector<OsrmProbeWaiter> waiters;
};

struct OsrmProbeDispatch {
std::optional<OsrmProbeResult> cached_result;
bool should_start_probe{false};
};

OsrmProbeCache& GetOsrmProbeCache() {
static OsrmProbeCache cache;
return cache;
static auto* cache = new OsrmProbeCache;
return *cache;
}

std::optional<OsrmProbeResult> GetCachedOsrmProbe() {
auto& cache = GetOsrmProbeCache();

std::lock_guard<std::mutex> lock(cache.mutex);
[[nodiscard]] std::optional<OsrmProbeResult>
ReadFreshCachedOsrmProbe(const OsrmProbeCache& cache,
const std::chrono::steady_clock::time_point now) {
if (!cache.cached_result.has_value()) {
return std::nullopt;
}

const auto now = std::chrono::steady_clock::now();
if (now - cache.cached_at > kOsrmProbeCacheTtl) {
return std::nullopt;
}

return cache.cached_result;
}

void CacheOsrmProbe(const OsrmProbeResult& result) {
[[nodiscard]] OsrmProbeDispatch AddOsrmProbeWaiter(OsrmProbeWaiter waiter) {
auto& cache = GetOsrmProbeCache();

std::lock_guard<std::mutex> lock(cache.mutex);
if (auto cached_result = ReadFreshCachedOsrmProbe(cache, std::chrono::steady_clock::now())) {
return OsrmProbeDispatch{.cached_result = std::move(cached_result)};
}

cache.waiters.push_back(std::move(waiter));
if (cache.probe_in_flight) {
return OsrmProbeDispatch{};
}

cache.probe_in_flight = true;
return OsrmProbeDispatch{.cached_result = std::nullopt, .should_start_probe = true};
}

[[nodiscard]] std::vector<OsrmProbeWaiter>
CacheOsrmProbeAndTakeWaiters(const OsrmProbeResult& result) {
auto& cache = GetOsrmProbeCache();

std::vector<OsrmProbeWaiter> waiters;
std::lock_guard<std::mutex> lock(cache.mutex);
cache.cached_result = result;
cache.cached_at = std::chrono::steady_clock::now();
cache.probe_in_flight = false;
waiters.swap(cache.waiters);
return waiters;
}

void StartOsrmProbe() {
auto osrm_client = GetOsrmHttpClient();
auto osrm_probe_request = drogon::HttpRequest::newHttpRequest();
osrm_probe_request->setMethod(drogon::Get);
osrm_probe_request->setPath(std::string{kOsrmProbePath});

osrm_client->sendRequest(
osrm_probe_request,
[osrm_client = std::move(osrm_client)](const drogon::ReqResult result,
const drogon::HttpResponsePtr& response) mutable {
(void)osrm_client;
const OsrmProbeResult osrm_probe = EvaluateOsrmProbe(result, response);
for (auto& waiter : CacheOsrmProbeAndTakeWaiters(osrm_probe)) {
waiter(osrm_probe);
}
},
kOsrmProbeTimeoutSeconds);
}

} // namespace
Expand All @@ -156,29 +205,23 @@ void RegisterHealthEndpoint(drogon::HttpAppFramework& app,
const drogon::HttpRequestPtr& /*request*/,
std::function<void(const drogon::HttpResponsePtr&)>&& callback) {
const bool vroom_ready = IsVroomBinaryReady();
if (const auto cached_probe = GetCachedOsrmProbe()) {
std::move(callback)(
BuildHealthResponse(vroom_ready, *cached_probe, observability, extension));
auto response_callback =
std::make_shared<std::function<void(const drogon::HttpResponsePtr&)>>(
std::move(callback));
auto respond = [vroom_ready, observability = observability, extension = extension,
response_callback](const OsrmProbeResult& osrm_probe) {
(*response_callback)(
BuildHealthResponse(vroom_ready, osrm_probe, observability, extension));
};
const OsrmProbeDispatch dispatch = AddOsrmProbeWaiter(respond);
if (dispatch.cached_result.has_value()) {
respond(*dispatch.cached_result);
return;
}

auto osrm_client = GetOsrmHttpClient();
auto osrm_probe_request = drogon::HttpRequest::newHttpRequest();
osrm_probe_request->setMethod(drogon::Get);
osrm_probe_request->setPath(std::string{kOsrmProbePath});

osrm_client->sendRequest(
osrm_probe_request,
[osrm_client = std::move(osrm_client), vroom_ready, observability = observability,
extension = extension, callback = std::move(callback)](
const drogon::ReqResult result, const drogon::HttpResponsePtr& response) mutable {
(void)osrm_client;
const OsrmProbeResult osrm_probe = EvaluateOsrmProbe(result, response);
CacheOsrmProbe(osrm_probe);
std::move(callback)(
BuildHealthResponse(vroom_ready, osrm_probe, observability, extension));
},
kOsrmProbeTimeoutSeconds);
if (dispatch.should_start_probe) {
StartOsrmProbe();
}
});
}

Expand Down
19 changes: 19 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,25 @@ deliveryoptimizer_set_local_http_test_port(
26000
)

add_test(
NAME DeliveryOptimizerApiContractTest.HealthCoalescesColdOsrmProbe
COMMAND "${BASH_PROGRAM}"
"${CMAKE_CURRENT_SOURCE_DIR}/integration/http_server/health_coalesces_cold_osrm_probe_test.sh"
"$<TARGET_FILE:deliveryoptimizer_api>" "${PYTHON3_PROGRAM}" "${CURL_PROGRAM}"
)
set_tests_properties(
DeliveryOptimizerApiContractTest.HealthCoalescesColdOsrmProbe
PROPERTIES
TIMEOUT 30
LABELS "${DELIVERYOPTIMIZER_LOCAL_API_REGRESSION_LABELS}"
)
deliveryoptimizer_resolve_local_http_test_port(health_coalesce_test_port 36500)
deliveryoptimizer_resolve_local_http_test_port(health_coalesce_stub_port 53500)
deliveryoptimizer_set_local_http_test_environment(
DeliveryOptimizerApiContractTest.HealthCoalescesColdOsrmProbe
"DELIVERYOPTIMIZER_TEST_PORT=${health_coalesce_test_port};DELIVERYOPTIMIZER_OSRM_STUB_PORT=${health_coalesce_stub_port}"
)

deliveryoptimizer_add_local_http_test(
DeliveryOptimizerApiContractTest.DeliveriesOptimizeMissingServiceDefaultsTo300
deliveries_optimize_missing_service_defaults_to_300_test.sh
Expand Down
137 changes: 137 additions & 0 deletions tests/integration/http_server/health_coalesces_cold_osrm_probe_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env bash
set -euo pipefail

script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=tests/integration/http_server/http_server_helpers.sh
source "${script_dir}/http_server_helpers.sh"

if [[ $# -lt 2 ]]; then
echo "usage: $0 <server-binary> <python-binary> [curl-binary]" >&2
exit 2
fi

python_bin="$2"
curl_bin="${3:-curl}"
stub_default_port="$((53000 + ($$ % 10000)))"
stub_port="${DELIVERYOPTIMIZER_OSRM_STUB_PORT:-${stub_default_port}}"

http_server_init 36500 "$1" "${curl_bin}"
ready_file="${work_dir}/stub-ready.txt"
probe_count_file="${work_dir}/probe-count.txt"
stub_log_file="${work_dir}/stub.log"
rm -f "${ready_file}" "${probe_count_file}"

http_server_cleanup_with_stub() {
if [[ -n "${stub_pid:-}" ]]; then
kill "${stub_pid}" >/dev/null 2>&1 || true
wait "${stub_pid}" >/dev/null 2>&1 || true
fi
http_server_cleanup
}
trap http_server_cleanup_with_stub EXIT

env STUB_PORT="${stub_port}" PROBE_COUNT_FILE="${probe_count_file}" READY_FILE="${ready_file}" \
"${python_bin}" - >"${stub_log_file}" 2>&1 <<'PY' &
import os
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer

port = int(os.environ["STUB_PORT"])
probe_count_file = os.environ["PROBE_COUNT_FILE"]
ready_file = os.environ["READY_FILE"]
lock = threading.Lock()
probe_count = 0


class Handler(BaseHTTPRequestHandler):
def do_GET(self):
global probe_count
if self.path.startswith("/nearest/v1/driving/"):
time.sleep(0.4)
with lock:
probe_count += 1
with open(probe_count_file, "w", encoding="utf-8") as count_file:
count_file.write(str(probe_count))

payload = b'{"code":"Ok"}'
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)

def log_message(self, format, *args):
return


server = ThreadingHTTPServer(("127.0.0.1", port), Handler)
with open(ready_file, "w", encoding="utf-8") as ready:
ready.write("ready")
server.serve_forever()
PY
stub_pid=$!

stub_ready=false
for _ in $(seq 1 50); do
if [[ -f "${ready_file}" ]]; then
stub_ready=true
break
fi
if ! kill -0 "${stub_pid}" >/dev/null 2>&1; then
break
fi
sleep 0.1
done

if [[ "${stub_ready}" != "true" ]]; then
echo "OSRM stub failed to start on port ${stub_port}" >&2
cat "${stub_log_file}" >&2 || true
exit 1
fi

http_server_start VROOM_BIN="/tmp/does-not-exist-vroom" OSRM_URL="http://127.0.0.1:${stub_port}"

server_ready=false
for _ in $(seq 1 50); do
http_code="$("${curl_bin}" -sS -o /dev/null -w "%{http_code}" \
"$(http_server_url /api/v1/osrm/tile/v1/driving/0/0/0.mvt)" || true)"
if [[ "${http_code}" == "403" ]]; then
server_ready=true
break
fi
sleep 0.1
done

if [[ "${server_ready}" != "true" ]]; then
Comment thread
phongndo marked this conversation as resolved.
echo "server failed to start on port ${port}" >&2
cat "${log_file}" >&2 || true
exit 1
fi

pids=()
for index in $(seq 1 5); do
"${curl_bin}" -sS -o "${work_dir}/health-${index}.json" -w "%{http_code}" \
"$(http_server_url /health)" >"${work_dir}/health-${index}.code" &
pids+=("$!")
done

for pid in "${pids[@]}"; do
wait "${pid}"
done

for index in $(seq 1 5); do
http_code="$(cat "${work_dir}/health-${index}.code")"
if [[ "${http_code}" != "503" ]]; then
echo "expected concurrent /health request ${index} to return 503 with missing VROOM, got ${http_code}" >&2
cat "${work_dir}/health-${index}.json" >&2 || true
exit 1
fi
done

probe_count="$(cat "${probe_count_file}")"
if [[ "${probe_count}" != "1" ]]; then
echo "expected concurrent cold /health requests to share one OSRM probe, got ${probe_count}" >&2
cat "${stub_log_file}" >&2 || true
exit 1
fi
Loading