Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ca68b58
fix multi machine monitor issues
wanglei19991004 Oct 21, 2025
a4a2104
Merge branch 'FlagOpen:main' into elastic
wanglei19991004 Oct 22, 2025
03d7a7b
Merge branch 'main' into elastic
wanglei19991004 Oct 29, 2025
aaa8a1b
Merge branch 'main' into elastic
zhaoyinglia Nov 19, 2025
9085ca5
remove overwrite arg and else
wanglei19991004 Nov 19, 2025
8c1a1eb
fix enable_monitoring default config
wanglei19991004 Nov 25, 2025
24c0c5e
fix no_shared_fs get_remote_mtime and add using warning
wanglei19991004 Nov 26, 2025
a8c0a89
fix excessive intermediate log generation
wanglei19991004 Nov 27, 2025
dacc499
remove debug function get_status_summary
wanglei19991004 Nov 27, 2025
61b5f19
remove redundant enable_log_collection and enable_diagnostic parameter
wanglei19991004 Nov 27, 2025
1227c18
fix rendezvous mis-diagnosis and mismatch between simulate functions …
wanglei19991004 Nov 27, 2025
45babc7
remove prompt output under disable monitor, redundant enable_monitori…
wanglei19991004 Nov 27, 2025
452887f
remove redundant parse parameter in monitor_service
wanglei19991004 Nov 27, 2025
09de9a1
remove redundant imports
wanglei19991004 Nov 27, 2025
eca2edb
remove duplicate monitor_service calls
wanglei19991004 Nov 27, 2025
f88842d
fix run error:remove enable_monitoring
wanglei19991004 Nov 27, 2025
bf47326
fix simulated_fault format
wanglei19991004 Nov 27, 2025
6e353c3
remove get_remote_file_exists function
wanglei19991004 Nov 27, 2025
615c2fc
Merge branch 'flagos-ai:main' into elastic
wanglei19991004 Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions flagscale/runner/elastic/diagnostic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"cuda out of memory": "OutOfMemoryError: CUDA out of memory error occurred.",
# Connection and network errors
"rendezvousconnectionerror": "RendezvousConnectionError: Connection to rendezvous backend failed.",
"rendezvous": "RendezvousError: Rendezvous coordination failed between nodes.",
"connection refused": "ConnectionError: Network connection refused.",
"connection timeout": "ConnectionTimeout: Network connection timeout.",
# Import and code errors
Expand Down Expand Up @@ -95,9 +94,12 @@ def generate_diagnostic_report(config, host, node_rank, log_file, return_content
"""
global _diagnostic_offsets

# Generate the path of the diagnostic file
log_dir = os.path.dirname(log_file)
diagnostic_file = os.path.join(log_dir, f"host_{node_rank}_{host}_diagnostic.txt")
# Always use the monitor subdirectory for diagnostic files (unified for single/multi-node)
base_log_dir = config.train.system.logging.log_dir
monitor_dir = os.path.join(base_log_dir, "monitor")
os.makedirs(monitor_dir, exist_ok=True)

diagnostic_file = os.path.join(monitor_dir, f"host_{node_rank}_{host}_diagnostic.txt")
host_key = f"{host}_{node_rank}"

try:
Expand Down
51 changes: 10 additions & 41 deletions flagscale/runner/elastic/log_collector.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,20 @@
import glob
import os
import shlex
import subprocess

from datetime import datetime

from flagscale.runner.utils import logger, run_local_command
from flagscale.runner.utils import get_remote_file_size, logger, run_local_command

_log_offsets = {}


def get_remote_file_size(host, filepath):
"""
Retrieve the size of a file on a remote host (in bytes).

Parameters:
host (str): The address of the remote host (e.g., 'user@hostname').
filepath (str): The path to the file on the remote host.

Returns:
int: The size of the file in bytes if successful; returns -1 if an error occurs.

Exception Handling:
subprocess.CalledProcessError: Caught when the SSH command fails.
"""
try:
result = subprocess.run(
["ssh", host, f"stat -c%s {filepath}"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
return int(result.stdout.strip())
except subprocess.CalledProcessError:
return -1


def get_file_size(host, filepath):
def get_file_size(host, filepath, port=22):
"""
Retrieve the size of a file, either locally or on a remote host (in bytes).

Parameters:
host (str): The address of the host (e.g., 'localhost' or 'user@hostname').
filepath (str): The path to the file, either local or on the remote host.
port (int): SSH port number for remote hosts (default: 22).

Returns:
int: The size of the file in bytes if successful; returns -1 if the file does not exist
Expand All @@ -57,7 +28,7 @@ def get_file_size(host, filepath):
return os.path.getsize(filepath)
return -1
else:
return get_remote_file_size(host, filepath)
return get_remote_file_size(host, filepath, port)


def find_actual_log_file(log_dir, node_rank, host, no_shared_fs=False):
Expand Down Expand Up @@ -115,34 +86,32 @@ def collect_logs(config, host, node_rank, destination_dir, dryrun=False):
no_shared_fs = config.experiment.runner.get("no_shared_fs", False)
log_dir = logging_config.log_dir
src_log_file = find_actual_log_file(log_dir, node_rank, host, no_shared_fs)
dest_log_file = os.path.join(
destination_dir,
f"host_{node_rank}_{host}_temp_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
)
dest_log_file = os.path.join(destination_dir, f"host_{node_rank}_{host}_current.log")

os.makedirs(destination_dir, exist_ok=True)

log_key = f"{host}_{node_rank}"
offset = _log_offsets.get(log_key, 0)

ssh_port = config.experiment.runner.get("ssh_port", 22)

try:
if host != "localhost":
ssh_port = config.experiment.runner.get("ssh_port", 22)
command = f"ssh -p {ssh_port} {host} 'tail -c +{offset + 1} {shlex.quote(src_log_file)}' > {shlex.quote(dest_log_file)}"
command = f"ssh -p {ssh_port} {host} 'tail -c +{offset + 1} {shlex.quote(src_log_file)}' >> {shlex.quote(dest_log_file)}"
run_local_command(command, dryrun)
logger.debug(
f"Collected incremental log from {host} (node {node_rank}) to {dest_log_file}"
)
else:
command = (
f"tail -c +{offset + 1} {shlex.quote(src_log_file)} > {shlex.quote(dest_log_file)}"
f"tail -c +{offset + 1} {shlex.quote(src_log_file)} >> {shlex.quote(dest_log_file)}"
)
run_local_command(command, dryrun)
logger.debug(f"Collected incremental local log to {dest_log_file}")

# Check if the source file exists and update the offset
if os.path.exists(src_log_file):
current_src_size = get_file_size(host, src_log_file)
current_src_size = get_file_size(host, src_log_file, ssh_port)
if current_src_size > 0:
if current_src_size > offset: # There is new content in the source file
_log_offsets[log_key] = current_src_size
Expand Down
12 changes: 6 additions & 6 deletions flagscale/runner/elastic/monitor_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import sys
import time

from pathlib import Path

from omegaconf import OmegaConf

from flagscale.runner.elastic.monitor_service import MonitorService
Expand Down Expand Up @@ -54,6 +52,8 @@ def main():
parser.add_argument(
"--pid-file", required=True, help="The path of the PID file for the training process"
)
parser.add_argument("--host", required=True, help="Hostname or IP of this node")
parser.add_argument("--node-rank", type=int, required=True, help="Node rank of this node")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two arguments are both required ... that means the launcher can be used only for single-node scenario. Am I understanding this correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two arguments are used to generate a separate diagnostic file for each host + node pair.
The current implementation actually supports multi-host and multi-node scenarios, not just a single node.

parser.add_argument(
"--no-shared-fs", action="store_true", help="Whether it is in non-shared file system mode"
)
Expand Down Expand Up @@ -90,12 +90,12 @@ def main():

# Create dummy runners and monitoring services
runner = MonitorRunner(config, args.pid_file)
monitor = MonitorService(config, runner, interval=args.interval)
monitor = MonitorService(
config, runner, interval=args.interval, host=args.host, node_rank=args.node_rank
)

# Start the monitoring service
monitor.start_monitoring(
enable_log_collection=args.enable_log_collection, enable_diagnostic=args.enable_diagnostic
)
monitor.start_monitoring()

logger.info(f"The monitoring service has been started. Interval: {args.interval} seconds")
logger.info(f"PID file: {args.pid_file}")
Expand Down
Loading
Loading