Skip to content

Commit d4864c7

Browse files
authored
feat: enhance endpoint server with improved binding and shutdown handling (#1042)
1 parent f762517 commit d4864c7

File tree

11 files changed

+121
-37
lines changed

11 files changed

+121
-37
lines changed

core/src/ten_runtime/app/BUILD.gn

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import("//build/ten_runtime/options.gni")
1010
ten_runtime_glob("app") {
1111
file_list = all_native_files
1212
deps = [
13+
"endpoint_system",
1314
"msg_interface",
1415
"ten_env",
1516
]
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright © 2025 Agora
3+
# This file is part of TEN Framework, an open source project.
4+
# Licensed under the Apache License, Version 2.0, with certain conditions.
5+
# Refer to the "LICENSE" file in the root directory for more information.
6+
#
7+
import("//build/ten_runtime/glob.gni")
8+
9+
ten_runtime_glob("endpoint_system") {
10+
file_list = all_native_files
11+
12+
if (ten_enable_ten_rust && ten_enable_ten_rust_apis) {
13+
public_deps = [ "//core/src/ten_rust:ten_rust_binding" ]
14+
}
15+
}

core/src/ten_rust/BUILD.gn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ if (ten_enable_ten_rust) {
8686
output_file = "include_internal/ten_rust/ten_rust.h"
8787
binding_files = [
8888
"src/bindings.rs",
89-
"src/endpoint/mod.rs",
89+
"src/endpoint_system/mod.rs",
9090
]
9191

9292
deps = [ ":ten_rust_static_lib" ]

core/src/ten_rust/src/endpoint/mod.rs renamed to core/src/ten_rust/src/endpoint_system/mod.rs

Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ use crate::constants::{
2222
ENDPOINT_SERVER_BIND_MAX_RETRIES, ENDPOINT_SERVER_BIND_RETRY_INTERVAL_SECS,
2323
};
2424

25-
pub struct TelemetrySystem {
25+
/// The system for the endpoint server.
26+
pub struct EndpointSystem {
27+
/// The Prometheus registry.
2628
registry: Registry,
2729

28-
actix_thread: Option<thread::JoinHandle<()>>,
30+
/// The server thread handle.
31+
server_thread_handle: Option<thread::JoinHandle<()>>,
2932

30-
/// Used to send a shutdown signal to the actix system where the server is
31-
/// located.
32-
actix_shutdown_tx: Option<oneshot::Sender<()>>,
33+
/// Used to send a shutdown signal to the server thread.
34+
server_thread_shutdown_tx: Option<oneshot::Sender<()>>,
3335
}
3436

3537
/// Configure API endpoints.
@@ -63,13 +65,17 @@ fn create_endpoint_server_with_retry(
6365
let registry_clone = registry.clone();
6466

6567
// Create a new HTTP server with the configured routes.
66-
let result = HttpServer::new(move || {
68+
let server = HttpServer::new(move || {
6769
App::new()
6870
.configure(|cfg| configure_routes(cfg, registry_clone.clone()))
6971
})
7072
// Make actix not linger on the socket.
7173
.shutdown_timeout(0)
72-
.bind(&endpoint_str);
74+
// Set a larger backlog for better performance during high load.
75+
.backlog(1024);
76+
77+
// Try to bind to the specified endpoint.
78+
let result = server.bind(&endpoint_str);
7379

7480
match result {
7581
Ok(server) => break server,
@@ -83,6 +89,13 @@ fn create_endpoint_server_with_retry(
8389
"Error binding to address: {} after {} attempts: {:?}",
8490
endpoint_str, attempts, e
8591
);
92+
93+
// Provide a helpful message for common issues.
94+
eprintln!(
95+
"Check if another process is using this port or if \
96+
you have permission to bind to this address."
97+
);
98+
8699
return None;
87100
}
88101

@@ -194,7 +207,7 @@ fn create_server_thread(
194207
#[no_mangle]
195208
pub unsafe extern "C" fn ten_endpoint_system_create(
196209
endpoint: *const c_char,
197-
) -> *mut TelemetrySystem {
210+
) -> *mut EndpointSystem {
198211
// Safely convert C string to Rust string.
199212
let endpoint_str = match CStr::from_ptr(endpoint).to_str() {
200213
Ok(s) if !s.trim().is_empty() => s.to_string(),
@@ -221,10 +234,10 @@ pub unsafe extern "C" fn ten_endpoint_system_create(
221234
let (server_thread_handle, shutdown_tx) = create_server_thread(server);
222235

223236
// Create and return the TelemetrySystem.
224-
let system = TelemetrySystem {
237+
let system = EndpointSystem {
225238
registry,
226-
actix_thread: Some(server_thread_handle),
227-
actix_shutdown_tx: Some(shutdown_tx),
239+
server_thread_handle: Some(server_thread_handle),
240+
server_thread_shutdown_tx: Some(shutdown_tx),
228241
};
229242

230243
// Convert to raw pointer for C API.
@@ -233,6 +246,11 @@ pub unsafe extern "C" fn ten_endpoint_system_create(
233246

234247
/// Shut down the endpoint system, stop the server, and clean up all resources.
235248
///
249+
/// This function implements a graceful shutdown with proper resource cleanup:
250+
/// 1. Sends a shutdown signal to the server
251+
/// 2. Waits for the server thread to complete with a timeout
252+
/// 3. Ensures all resources are properly released
253+
///
236254
/// # Safety
237255
///
238256
/// This function assumes that `system_ptr` is either null or a valid pointer to
@@ -241,7 +259,7 @@ pub unsafe extern "C" fn ten_endpoint_system_create(
241259
/// will lead to undefined behavior.
242260
#[no_mangle]
243261
pub unsafe extern "C" fn ten_endpoint_system_shutdown(
244-
system_ptr: *mut TelemetrySystem,
262+
system_ptr: *mut EndpointSystem,
245263
) {
246264
debug_assert!(!system_ptr.is_null(), "System pointer is null");
247265
// Early return for null pointers.
@@ -256,29 +274,79 @@ pub unsafe extern "C" fn ten_endpoint_system_shutdown(
256274
let system = Box::from_raw(system_ptr);
257275

258276
// Notify the actix system to shut down through the `oneshot` channel.
259-
if let Some(shutdown_tx) = system.actix_shutdown_tx {
277+
if let Some(shutdown_tx) = system.server_thread_shutdown_tx {
260278
eprintln!("Shutting down endpoint server...");
261279
if let Err(e) = shutdown_tx.send(()) {
262280
eprintln!("Failed to send shutdown signal: {:?}", e);
263-
264-
panic!("Failed to send shutdown signal");
281+
// Don't panic, just continue with cleanup.
282+
eprintln!(
283+
"Continuing with cleanup despite shutdown signal failure"
284+
);
265285
}
266286
} else {
267287
eprintln!("No shutdown channel available for the endpoint server");
268288
}
269289

270-
// Wait for the server thread to complete.
271-
if let Some(server_thread_handle) = system.actix_thread {
290+
// Wait for the server thread to complete with a timeout.
291+
if let Some(server_thread_handle) = system.server_thread_handle {
272292
eprintln!("Waiting for endpoint server to shut down...");
273-
match server_thread_handle.join() {
274-
Ok(_) => eprintln!("Endpoint server thread joined successfully"),
275-
Err(e) => {
276-
eprintln!("Error joining endpoint server thread: {:?}", e)
293+
294+
// Define a timeout for the join operation.
295+
const SHUTDOWN_TIMEOUT_SECS: u64 = 10;
296+
297+
// We use std::thread::scope to ensure the spawned thread is joined
298+
// This prevents thread leaks even if an error occurs.
299+
std::thread::scope(|s| {
300+
// Create a timeout channel for coordination.
301+
let (tx, rx) = std::sync::mpsc::channel();
302+
303+
// Spawn a scoped thread to join the server thread.
304+
s.spawn(move || {
305+
let join_result = server_thread_handle.join();
306+
307+
// Send result, ignore errors if receiver dropped.
308+
let _ = tx.send(join_result);
309+
});
310+
311+
// Wait with timeout.
312+
match rx.recv_timeout(std::time::Duration::from_secs(
313+
SHUTDOWN_TIMEOUT_SECS,
314+
)) {
315+
Ok(join_result) => match join_result {
316+
Ok(_) => {
317+
eprintln!("Endpoint server thread joined successfully")
318+
}
319+
Err(e) => eprintln!(
320+
"Error joining endpoint server thread: {:?}",
321+
e
322+
),
323+
},
324+
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
325+
eprintln!(
326+
"WARNING: Endpoint server thread did not shut down \
327+
within timeout ({}s)",
328+
SHUTDOWN_TIMEOUT_SECS
329+
);
330+
eprintln!(
331+
"The thread may still be running, potentially leaking \
332+
resources"
333+
);
334+
}
335+
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
336+
eprintln!(
337+
"ERROR: Channel disconnected while waiting for server \
338+
thread to join"
339+
);
340+
}
277341
}
278-
}
342+
343+
// The scoped thread is automatically joined when we exit this
344+
// scope.
345+
});
279346
} else {
280347
eprintln!("No thread handle available for the endpoint server");
281348
}
282349

283350
// The system will be automatically dropped here, cleaning up all resources.
351+
eprintln!("Endpoint server resources cleaned up");
284352
}

core/src/ten_rust/src/endpoint/telemetry/counter.rs renamed to core/src/ten_rust/src/endpoint_system/telemetry/counter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::os::raw::c_char;
99

1010
use anyhow::Result;
1111

12-
use super::{MetricHandle, TelemetrySystem};
12+
use super::{EndpointSystem, MetricHandle};
1313

1414
unsafe fn convert_label_values(
1515
values_ptr: *const *const c_char,
@@ -38,7 +38,7 @@ unsafe fn convert_label_values(
3838
}
3939

4040
pub fn create_metric_counter(
41-
system: &mut TelemetrySystem,
41+
system: &mut EndpointSystem,
4242
name_str: &str,
4343
help_str: &str,
4444
) -> Result<MetricHandle> {
@@ -57,7 +57,7 @@ pub fn create_metric_counter(
5757
}
5858

5959
pub fn create_metric_counter_with_labels(
60-
system: &mut TelemetrySystem,
60+
system: &mut EndpointSystem,
6161
name_str: &str,
6262
help_str: &str,
6363
label_names: &[&str],

core/src/ten_rust/src/endpoint/telemetry/gauge.rs renamed to core/src/ten_rust/src/endpoint_system/telemetry/gauge.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use std::os::raw::c_char;
88

99
use anyhow::Result;
1010

11-
use crate::endpoint::telemetry::convert_label_values;
11+
use crate::endpoint_system::telemetry::convert_label_values;
1212

13-
use super::{MetricHandle, TelemetrySystem};
13+
use super::{EndpointSystem, MetricHandle};
1414

1515
pub fn create_metric_gauge(
16-
system: &mut TelemetrySystem,
16+
system: &mut EndpointSystem,
1717
name_str: &str,
1818
help_str: &str,
1919
) -> Result<MetricHandle> {
@@ -31,7 +31,7 @@ pub fn create_metric_gauge(
3131
}
3232

3333
pub fn create_metric_gauge_with_labels(
34-
system: &mut TelemetrySystem,
34+
system: &mut EndpointSystem,
3535
name_str: &str,
3636
help_str: &str,
3737
label_names: &[&str],

core/src/ten_rust/src/endpoint/telemetry/histogram.rs renamed to core/src/ten_rust/src/endpoint_system/telemetry/histogram.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use std::os::raw::c_char;
88

99
use anyhow::Result;
1010

11-
use crate::endpoint::telemetry::convert_label_values;
11+
use crate::endpoint_system::telemetry::convert_label_values;
1212

13-
use super::{MetricHandle, TelemetrySystem};
13+
use super::{EndpointSystem, MetricHandle};
1414

1515
pub fn create_metric_histogram(
16-
system: &mut TelemetrySystem,
16+
system: &mut EndpointSystem,
1717
name_str: &str,
1818
help_str: &str,
1919
) -> Result<MetricHandle> {
@@ -33,7 +33,7 @@ pub fn create_metric_histogram(
3333
}
3434

3535
pub fn create_metric_histogram_with_labels(
36-
system: &mut TelemetrySystem,
36+
system: &mut EndpointSystem,
3737
name_str: &str,
3838
help_str: &str,
3939
label_names: &[&str],

core/src/ten_rust/src/endpoint/telemetry/mod.rs renamed to core/src/ten_rust/src/endpoint_system/telemetry/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use prometheus::{Encoder, Registry, TextEncoder};
2323

2424
use crate::constants::METRICS;
2525

26-
use super::TelemetrySystem;
26+
use super::EndpointSystem;
2727

2828
pub enum MetricHandle {
2929
Counter(prometheus::Counter),
@@ -116,7 +116,7 @@ unsafe fn convert_label_values(
116116
#[no_mangle]
117117
#[allow(clippy::not_unsafe_ptr_arg_deref)]
118118
pub unsafe extern "C" fn ten_metric_create(
119-
system_ptr: *mut TelemetrySystem,
119+
system_ptr: *mut EndpointSystem,
120120
metric_type: u32, // 0=Counter, 1=Gauge, 2=Histogram
121121
name: *const c_char,
122122
help: *const c_char,

0 commit comments

Comments
 (0)