diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index 0e6b4e1d8c..1689eb5f32 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -7,6 +7,7 @@ import fedml from docker import errors, DockerClient import stat +import zipfile from fedml.computing.scheduler.comm_utils import sys_utils from fedml.computing.scheduler.comm_utils.constants import SchedulerConstants @@ -709,3 +710,14 @@ def get_job_type_from_run_id(run_id: str) -> str: except Exception as e: logging.debug(f"Failed to get job obj with Exception {e}. Traceback: {traceback.format_exc()}") return job_type + + @staticmethod + def unzip_file(zip_file, unzip_file_path) -> str: + if zipfile.is_zipfile(zip_file): + with zipfile.ZipFile(zip_file, "r") as zipf: + zipf.extractall(unzip_file_path) + unzipped_file_name = zipf.namelist()[0] + else: + raise Exception("Invalid zip file {}".format(zip_file)) + + return unzipped_file_name diff --git a/python/fedml/computing/scheduler/comm_utils/mqtt_topics.py b/python/fedml/computing/scheduler/comm_utils/mqtt_topics.py new file mode 100644 index 0000000000..11f388270e --- /dev/null +++ b/python/fedml/computing/scheduler/comm_utils/mqtt_topics.py @@ -0,0 +1,284 @@ +from typing import Union + + +class MqttTopics: + # The MQTT message topic format is as follows: // + + __last_will_message = "flclient_agent/last_will_msg" + + # ============== Server -> Client ============== + + # Train Topics + __server_client_start_train = "flserver_agent/{client_id}/start_train" + __server_client_stop_train = "flserver_agent/{client_id}/stop_train" + + # Device Monitoring Topics + __server_client_request_device_info = "server/client/request_device_info/{client_id}" + __client_client_agent_status = "fl_client/flclient_agent_{client_id}/status" + __server_server_agent_status = "fl_server/flserver_agent_{server_id}/status" + + # ============== Client -> Server ============== + + # Metrics and Logs Topics + __client_server_metrics = "fedml_slave/fedml_master/metrics/{run_id}" + __client_server_logs = "fedml_slave/fedml_master/logs/{run_id}" + __client_server_response_device_info = "client/server/response_device_info/{server_id}" + + # ============== MLOps -> Client ============== + + # Authentication Topics + __mlops_client_logout = "mlops/client/logout/{client_id}" + + # Device Monitoring and Library Update Topics + __mlops_client_report_device_status = "mlops/report_device_status" + __mlops_client_ota = "mlops/flclient_agent_{client_id}/ota" + + # Deployment Topics + __deploy_mlops_slave_request_device_info = "deploy/mlops/slave_agent/request_device_info/{slave_id}" + __deploy_mlops_client_request_device_info = "deploy/mlops/client_agent/request_device_info/{client_id}" + + # ============== MLOps -> Server ============== + __mlops_server_start_train = "mlops/flserver_agent_{server_id}/start_train" + __mlops_server_stop_train = "mlops/flserver_agent_{server_id}/stop_train" + __mlops_server_ota = "mlops/flserver_agent_{server_id}/ota" + + # Deployment Topics + __mlops_master_request_device_info = "deploy/mlops/master_agent/request_device_info/{master_id}" + __deploy_mlops_master_request_device_info = "deploy/mlops/master_agent/request_device_info/{master_id}" + + # ============== Client -> MLOps ============== + + # Monitoring Topics + __client_mlops_status = "fl_client/mlops/status" + __client_mlops_active = "flclient_agent/active" + + # Run Topics + __run_client_mlops_status = "fl_run/fl_client/mlops/status" + __run_server_mlops_status = "fl_run/fl_server/mlops/status" + __client_mlops_job_cost = "ml_client/mlops/job_computing_cost" + + # ============== Server -> MLOps ============== + + # Train Topics + __server_mlops_training_progress = "fl_server/mlops/training_progress_and_eval" + # TODO (alaydshah): Fix the typo (roundx -> rounds) + __server_mlops_training_rounds = "fl_server/mlops/training_roundx" + + # Federate Topics + __server_mlops_client_model = "fl_server/mlops/client_model" + __server_mlops_aggregated_model = "fl_server/mlops/global_aggregated_model" + __server_mlops_training_model_net = "fl_server/mlops/training_model_net" + + # Deploy Topics + __server_mlops_deploy_progress = "fl_server/mlops/deploy_progress_and_eval" + __model_serving_mlops_llm_input_output_record = "model_serving/mlops/llm_input_output_record" + __deploy_master_mlops_response_device_info = "deploy/master_agent/mlops/response_device_info" + + # Monitoring Topics + __server_mlops_active = "flserver_agent/active" + + # ============== Device -> MLOps ============== + + # Launch Topics + __launch_mlops_artifacts = "launch_device/mlops/artifacts" + __launch_mlops_release_gpu_ids = "launch_device/mlops/release_gpu_ids" + __launch_mlops_sync_deploy_ids = "launch_device/mlops/sync_deploy_ids" + + # Deployment Topics + __deploy_mlops_status = "model_ops/model_device/return_deployment_status" + __compute_mlops_endpoint = "compute/mlops/endpoint" + + # Device & System Performance Topics + __client_mlops_system_performance = "fl_client/mlops/system_performance" + __client_mlops_gpu_device_info = "ml_client/mlops/gpu_device_info" + + # ============== Diagnosis ============== + __test_mqtt_connection = "fedml/{mqtt_client_id}/test_mqtt_msg" + + # TODO (alaydshah): Make sure these aren't used anywhere, and clean them up + # ============== Deprecated ============== + + __server_run_exception = "flserver_agent/{run_id}/client_exit_train_with_exception" + __server_mlops_status = "fl_server/mlops/status" + __client_mlops_training_metrics = "fl_client/mlops/training_metrics" + __mlops_runtime_logs_run = "mlops/runtime_logs/{run_id}" + + @classmethod + def server_client_start_train(cls, client_id: Union[int, str]): + return cls.__server_client_start_train.format(client_id=client_id) + + @classmethod + def server_client_stop_train(cls, client_id: Union[int, str]): + return cls.__server_client_stop_train.format(client_id=client_id) + + @classmethod + def server_client_request_device_info(cls, client_id: Union[int, str]): + return cls.__server_client_request_device_info.format(client_id=client_id) + + @classmethod + def client_client_agent_status(cls, client_id: Union[int, str]): + return cls.__client_client_agent_status.format(client_id=client_id) + + @classmethod + def server_server_agent_status(cls, server_id: Union[int, str]): + return cls.__server_server_agent_status.format(server_id=server_id) + + @classmethod + def mlops_client_report_device_status(cls): + return cls.__mlops_client_report_device_status + + @classmethod + def mlops_client_ota(cls, client_id: Union[int, str]): + return cls.__mlops_client_ota.format(client_id=client_id) + + @classmethod + def deploy_mlops_slave_request_device_info(cls, slave_id: Union[int, str]): + return cls.__deploy_mlops_slave_request_device_info.format(slave_id=slave_id) + + @classmethod + def mlops_master_request_device_info(cls, master_id: Union[int, str]): + return cls.__mlops_master_request_device_info.format(master_id=master_id) + + @classmethod + def mlops_client_logout(cls, client_id: Union[int, str]): + return cls.__mlops_client_logout.format(client_id=client_id) + + @classmethod + def deploy_mlops_client_request_device_info(cls, client_id: Union[int, str]): + return cls.__deploy_mlops_client_request_device_info.format(client_id=client_id) + + @classmethod + def last_will_message(cls): + return cls.__last_will_message + + @classmethod + def client_mlops_status(cls): + return cls.__client_mlops_status + + @classmethod + def run_client_mlops_status(cls): + return cls.__run_client_mlops_status + + @classmethod + def run_server_mlops_status(cls): + return cls.__run_server_mlops_status + + @classmethod + def server_run_exception(cls, run_id: Union[int, str]): + return cls.__server_run_exception.format(run_id=run_id) + + @classmethod + def server_mlops_status(cls): + return cls.__server_mlops_status + + @classmethod + def client_mlops_training_metrics(cls): + return cls.__client_mlops_training_metrics + + @classmethod + def server_mlops_training_progress(cls): + return cls.__server_mlops_training_progress + + @classmethod + def server_mlops_deploy_progress(cls): + return cls.__server_mlops_deploy_progress + + @classmethod + def mlops_server_ota(cls, server_id: Union[int, str]): + return cls.__mlops_server_ota.format(server_id=server_id) + + @classmethod + def client_server_metrics(cls, run_id: Union[int, str]): + return cls.__client_server_metrics.format(run_id=run_id) + + @classmethod + def client_server_logs(cls, run_id: Union[int, str]): + return cls.__client_server_logs.format(run_id=run_id) + + @classmethod + def server_mlops_training_rounds(cls): + return cls.__server_mlops_training_rounds + + @classmethod + def server_mlops_client_model(cls): + return cls.__server_mlops_client_model + + @classmethod + def server_mlops_aggregated_model(cls): + return cls.__server_mlops_aggregated_model + + @classmethod + def server_mlops_training_model_net(cls): + return cls.__server_mlops_training_model_net + + @classmethod + def model_serving_mlops_llm_input_output_record(cls): + return cls.__model_serving_mlops_llm_input_output_record + + @classmethod + def client_mlops_job_cost(cls): + return cls.__client_mlops_job_cost + + @classmethod + def mlops_runtime_logs_run(cls, run_id: Union[int, str]): + return cls.__mlops_runtime_logs_run.format(run_id=run_id) + + @classmethod + def launch_mlops_artifacts(cls): + return cls.__launch_mlops_artifacts + + @classmethod + def deploy_mlops_status(cls): + return cls.__deploy_mlops_status + + @classmethod + def client_mlops_system_performance(cls): + return cls.__client_mlops_system_performance + + @classmethod + def client_mlops_gpu_device_info(cls): + return cls.__client_mlops_gpu_device_info + + @classmethod + def compute_mlops_endpoint(cls): + return cls.__compute_mlops_endpoint + + @classmethod + def launch_mlops_release_gpu_ids(cls): + return cls.__launch_mlops_release_gpu_ids + + @classmethod + def launch_mlops_sync_deploy_ids(cls): + return cls.__launch_mlops_sync_deploy_ids + + @classmethod + def mlops_server_start_train(cls, server_id: Union[int, str]): + return cls.__mlops_server_start_train.format(server_id=server_id) + + @classmethod + def mlops_server_stop_train(cls, server_id: Union[int, str]): + return cls.__mlops_server_stop_train.format(server_id=server_id) + + @classmethod + def client_server_response_device_info(cls, server_id: Union[int, str]): + return cls.__client_server_response_device_info.format(server_id=server_id) + + @classmethod + def deploy_mlops_master_request_device_info(cls, master_id: Union[int, str]): + return cls.__deploy_mlops_master_request_device_info.format(master_id=master_id) + + @classmethod + def client_mlops_active(cls): + return cls.__client_mlops_active + + @classmethod + def server_mlops_active(cls): + return cls.__server_mlops_active + + @classmethod + def deploy_master_mlops_response_device_info(cls): + return cls.__deploy_master_mlops_response_device_info + + @classmethod + def test_mqtt_connection(cls, mqtt_client_id: Union[int, str]): + return cls.__test_mqtt_connection.format(mqtt_client_id=mqtt_client_id) diff --git a/python/fedml/computing/scheduler/comm_utils/test_topics_refactoring.py b/python/fedml/computing/scheduler/comm_utils/test_topics_refactoring.py new file mode 100644 index 0000000000..0ee66d3719 --- /dev/null +++ b/python/fedml/computing/scheduler/comm_utils/test_topics_refactoring.py @@ -0,0 +1,150 @@ +from fedml.computing.scheduler.comm_utils.mqtt_topics import MqttTopics + + +def test_mqtt_topics(): + mqtt_client_id = 0 + edge_id = 1 + server_id = 2 + end_point_id = 10 + run_id = 100 + model_device_client_edge_id_list = [1, 2, 3] + + topic_start_train = "flserver_agent/" + str(edge_id) + "/start_train" + assert MqttTopics.server_client_start_train(client_id=edge_id) == topic_start_train + + topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train" + assert MqttTopics.server_client_stop_train(client_id=edge_id) == topic_stop_train + + topic_client_status = "fl_client/flclient_agent_" + str(edge_id) + "/status" + assert MqttTopics.client_client_agent_status(client_id=edge_id) == topic_client_status + + topic_report_status = "mlops/report_device_status" + assert MqttTopics.mlops_client_report_device_status() == topic_report_status + + topic_ota_msg = "mlops/flclient_agent_" + str(edge_id) + "/ota" + assert MqttTopics.mlops_client_ota(client_id=edge_id) == topic_ota_msg + + topic_request_device_info = "server/client/request_device_info/" + str(edge_id) + assert MqttTopics.server_client_request_device_info(client_id=edge_id) == topic_request_device_info + + topic_request_edge_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{edge_id}" + assert MqttTopics.deploy_mlops_slave_request_device_info( + slave_id=edge_id) == topic_request_edge_device_info_from_mlops + + topic_request_client_device_info_from_mlops = f"deploy/mlops/client_agent/request_device_info/{edge_id}" + assert (MqttTopics.deploy_mlops_client_request_device_info(client_id=edge_id) + == topic_request_client_device_info_from_mlops) + + topic_deploy_master_request_device_info = f"deploy/mlops/master_agent/request_device_info/{edge_id}" + assert (MqttTopics.deploy_mlops_master_request_device_info(master_id=edge_id) == + topic_deploy_master_request_device_info) + + topic_request_deploy_slave_device_info_from_mlops = (f"deploy/mlops/slave_agent/request_device_info/" + f"{model_device_client_edge_id_list[0]}") + assert (MqttTopics.deploy_mlops_slave_request_device_info + (slave_id=model_device_client_edge_id_list[0]) == topic_request_deploy_slave_device_info_from_mlops) + + topic_client_logout = "mlops/client/logout/" + str(edge_id) + assert MqttTopics.mlops_client_logout(client_id=edge_id) == topic_client_logout + + topic_last_will_message = "flclient_agent/last_will_msg" + assert MqttTopics.last_will_message() == topic_last_will_message + + topic_client_mlops_statis = "fl_client/mlops/status" + assert MqttTopics.client_mlops_status() == topic_client_mlops_statis + + topic_run_client_mlops_status = "fl_run/fl_client/mlops/status" + assert MqttTopics.run_client_mlops_status() == topic_run_client_mlops_status + + topic_run_server_mlops_status = "fl_run/fl_server/mlops/status" + assert MqttTopics.run_server_mlops_status() == topic_run_server_mlops_status + + topic_server_server_agent_status = f"fl_server/flserver_agent_{server_id}/status" + assert MqttTopics.server_server_agent_status(server_id=server_id) == topic_server_server_agent_status + + topic_exit_train_with_exception = "flserver_agent/" + str(run_id) + "/client_exit_train_with_exception" + assert MqttTopics.server_run_exception(run_id=run_id) == topic_exit_train_with_exception + + topic_server_mlops_status = "fl_server/mlops/status" + assert MqttTopics.server_mlops_status() == topic_server_mlops_status + + topic_client_mlops_training_metrics = "fl_client/mlops/training_metrics" + assert MqttTopics.client_mlops_training_metrics() == topic_client_mlops_training_metrics + + topic_server_mlops_training_progress = "fl_server/mlops/training_progress_and_eval" + assert MqttTopics.server_mlops_training_progress() == topic_server_mlops_training_progress + + topic_server_mlops_deploy_progress = "fl_server/mlops/deploy_progress_and_eval" + assert MqttTopics.server_mlops_deploy_progress() == topic_server_mlops_deploy_progress + + topic_client_server_metrics = f"fedml_slave/fedml_master/metrics/{run_id}" + assert MqttTopics.client_server_metrics(run_id=run_id) == topic_client_server_metrics + + topic_client_server_logs = f"fedml_slave/fedml_master/logs/{run_id}" + assert MqttTopics.client_server_logs(run_id=run_id) == topic_client_server_logs + + topic_server_mlops_training_rounds = "fl_server/mlops/training_roundx" + assert MqttTopics.server_mlops_training_rounds() == topic_server_mlops_training_rounds + + topic_server_mlops_client_model = "fl_server/mlops/client_model" + assert MqttTopics.server_mlops_client_model() == topic_server_mlops_client_model + + topic_server_mlops_aggregated_model = "fl_server/mlops/global_aggregated_model" + assert MqttTopics.server_mlops_aggregated_model() == topic_server_mlops_aggregated_model + + topic_server_mlops_training_model_net = "fl_server/mlops/training_model_net" + assert MqttTopics.server_mlops_training_model_net() == topic_server_mlops_training_model_net + + topic_model_serving_mlops_llm_input_output = "model_serving/mlops/llm_input_output_record" + assert MqttTopics.model_serving_mlops_llm_input_output_record() == topic_model_serving_mlops_llm_input_output + + topic_client_mlops_job_cost = "ml_client/mlops/job_computing_cost" + assert MqttTopics.client_mlops_job_cost() == topic_client_mlops_job_cost + + topic_mlops_runtime_logs_run = "mlops/runtime_logs/" + str(run_id) + assert MqttTopics.mlops_runtime_logs_run(run_id=run_id) == topic_mlops_runtime_logs_run + + topic_launch_mlops_artifacts = "launch_device/mlops/artifacts" + assert MqttTopics.launch_mlops_artifacts() == topic_launch_mlops_artifacts + + deployment_status_topic_prefix = "model_ops/model_device/return_deployment_status" + assert MqttTopics.deploy_mlops_status() == deployment_status_topic_prefix + + topic_client_mlops_system_performance = "fl_client/mlops/system_performance" + assert MqttTopics.client_mlops_system_performance() == topic_client_mlops_system_performance + + topic_client_mlops_gpu_device_info = "ml_client/mlops/gpu_device_info" + assert MqttTopics.client_mlops_gpu_device_info() == topic_client_mlops_gpu_device_info + + topic_compute_mlops_endpoint = "compute/mlops/endpoint" + assert MqttTopics.compute_mlops_endpoint() == topic_compute_mlops_endpoint + + topic_launch_mlops_release_gpu_ids = "launch_device/mlops/release_gpu_ids" + assert MqttTopics.launch_mlops_release_gpu_ids() == topic_launch_mlops_release_gpu_ids + + topic_launch_mlops_sync_deploy_ids = "launch_device/mlops/sync_deploy_ids" + assert MqttTopics.launch_mlops_sync_deploy_ids() == topic_launch_mlops_sync_deploy_ids + + topic_server_start_train = "mlops/flserver_agent_" + str(server_id) + "/start_train" + assert MqttTopics.mlops_server_start_train(server_id=server_id) == topic_server_start_train + + topic_stop_train = "mlops/flserver_agent_" + str(server_id) + "/stop_train" + assert MqttTopics.mlops_server_stop_train(server_id=server_id) == topic_stop_train + + topic_server_ota = "mlops/flserver_agent_" + str(server_id) + "/ota" + assert MqttTopics.mlops_server_ota(server_id=server_id) == topic_server_ota + + topic_response_device_info = "client/server/response_device_info/" + str(server_id) + assert MqttTopics.client_server_response_device_info(server_id=server_id) == topic_response_device_info + + topic_agent_mlops_active = "flclient_agent/active" + assert MqttTopics.client_mlops_active() == topic_agent_mlops_active + + topic_server_mlops_active = "flserver_agent/active" + assert MqttTopics.server_mlops_active() == topic_server_mlops_active + + topic_master_mlops_response_device_info = "deploy/master_agent/mlops/response_device_info" + assert MqttTopics.deploy_master_mlops_response_device_info() == topic_master_mlops_response_device_info + + topic_test_mqtt_connection = "fedml/" + str(mqtt_client_id) + "/test_mqtt_msg" + assert MqttTopics.test_mqtt_connection(mqtt_client_id=mqtt_client_id) == topic_test_mqtt_connection diff --git a/python/fedml/computing/scheduler/master/server_runner.py b/python/fedml/computing/scheduler/master/server_runner.py index f5cd2500eb..6926320201 100755 --- a/python/fedml/computing/scheduler/master/server_runner.py +++ b/python/fedml/computing/scheduler/master/server_runner.py @@ -26,6 +26,7 @@ import fedml from ..comm_utils.job_cleanup import JobCleanup +from ..comm_utils.mqtt_topics import MqttTopics from ..scheduler_core.scheduler_matcher import SchedulerMatcher from ..comm_utils.constants import SchedulerConstants from ..comm_utils.job_utils import JobRunnerUtils @@ -1280,7 +1281,7 @@ def detect_edges_status( return True, active_edge_info_dict, inactivate_edges def send_status_check_msg(self, run_id, edge_id, server_id, context=None): - topic_get_model_device_id = "server/client/request_device_info/" + str(edge_id) + topic_get_model_device_id = MqttTopics.server_client_request_device_info(client_id=edge_id) payload = {"server_id": server_id, "run_id": run_id} if context is not None: payload["context"] = context @@ -1374,14 +1375,14 @@ def send_training_request_to_edges(self, active_edge_info_dict=None): "machine_id": edge_id_item, "endpoint_gpu_count": gpu_num, "master_deploy_id": edge_info.get("master_device_id", 0), "slave_deploy_id": edge_info.get("slave_device_id", 0)}) - topic_name = f"compute/mlops/endpoint" + topic_name = MqttTopics.compute_mlops_endpoint() endpoint_info_json = {"endpoint_id": endpoint_id, "endpoint_info": endpoint_info} print(f"endpoint_info_json {endpoint_info_json}") self.message_center.send_message(topic_name, json.dumps(endpoint_info_json)) client_rank = 1 for edge_id in edge_id_list: - topic_start_train = "flserver_agent/" + str(edge_id) + "/start_train" + topic_start_train = MqttTopics.server_client_start_train(client_id=edge_id) logging.info("start_train: send topic " + topic_start_train + " to client...") request_json = self.request_json request_json["client_rank"] = client_rank @@ -1409,7 +1410,7 @@ def setup_listeners_for_edge_status(self, run_id, edge_ids, server_id): self.client_agent_active_list[f"{run_id}"][f"server"] = server_id for edge_id in edge_ids: self.client_agent_active_list[f"{run_id}"][f"{edge_id}"] = ServerConstants.MSG_MLOPS_SERVER_STATUS_IDLE - edge_status_topic = "fl_client/flclient_agent_" + str(edge_id) + "/status" + edge_status_topic = MqttTopics.client_client_agent_status(client_id=edge_id) self.add_message_listener(edge_status_topic, self.callback_edge_status) self.subscribe_msg(edge_status_topic) @@ -1418,25 +1419,25 @@ def remove_listeners_for_edge_status(self, edge_ids=None): edge_ids = self.request_json["edgeids"] for edge_id in edge_ids: - edge_status_topic = "fl_client/flclient_agent_" + str(edge_id) + "/status" + edge_status_topic = MqttTopics.client_client_agent_status(client_id=edge_id) self.unsubscribe_msg(edge_status_topic) def setup_listener_for_run_metrics(self, run_id): - metric_topic = f"fedml_slave/fedml_master/metrics/{run_id}" + metric_topic = MqttTopics.client_server_metrics(run_id=run_id) self.add_message_listener(metric_topic, self.callback_run_metrics) self.subscribe_msg(metric_topic) def remove_listener_for_run_metrics(self, run_id): - metric_topic = f"fedml_slave/fedml_master/metrics/{run_id}" + metric_topic = MqttTopics.client_server_metrics(run_id=run_id) self.unsubscribe_msg(metric_topic) def setup_listener_for_run_logs(self, run_id): - logs_topic = f"fedml_slave/fedml_master/logs/{run_id}" + logs_topic = MqttTopics.client_server_logs(run_id=run_id) self.add_message_listener(logs_topic, self.callback_run_logs) self.subscribe_msg(logs_topic) def remove_listener_for_run_logs(self, run_id): - logs_topic = f"fedml_slave/fedml_master/logs/{run_id}" + logs_topic = MqttTopics.client_server_logs(run_id=run_id) self.unsubscribe_msg(logs_topic) def callback_run_logs(self, topic, payload): @@ -1857,17 +1858,17 @@ def send_training_stop_request_to_edges( payload_obj = json.loads(payload) for edge_id in edge_id_list: - topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train" + topic_stop_train = MqttTopics.server_client_stop_train(client_id=edge_id) logging.info("stop_train: send topic " + topic_stop_train) self.message_center.send_message(topic_stop_train, json.dumps(payload_obj)) def send_training_stop_request_to_specific_edge(self, edge_id, payload): - topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train" + topic_stop_train = MqttTopics.server_client_stop_train(client_id=edge_id) logging.info("stop_train: send topic " + topic_stop_train) self.message_center.send_message(topic_stop_train, payload) def send_training_stop_request_to_cloud_server(self, edge_id, payload): - topic_stop_train = "mlops/flserver_agent_" + str(edge_id) + "/stop_train" + topic_stop_train = MqttTopics.mlops_server_stop_train(server_id=edge_id) logging.info("stop_train: send topic " + topic_stop_train) self.message_center.send_message(topic_stop_train, payload) @@ -1880,7 +1881,7 @@ def send_training_stop_request_to_edges_when_exception( else: payload_obj = json.loads(payload) payload_obj["run_status"] = ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION if status is None else status - topic_stop_train = "flserver_agent/" + str(self.edge_id) + "/stop_train" + topic_stop_train = MqttTopics.server_client_stop_train(self.edge_id) self.callback_stop_train(topic_stop_train, json.dumps(payload_obj), use_payload=payload_obj) def callback_stop_train(self, topic, payload, use_payload=None): @@ -2322,7 +2323,7 @@ def callback_request_device_info_from_mlops(self, topic, payload): self.response_device_info_to_mlops(topic, payload) def response_device_info_to_mlops(self, topic, payload): - response_topic = f"deploy/master_agent/mlops/response_device_info" + response_topic = MqttTopics.deploy_master_mlops_response_device_info() payload_json = json.loads(payload) need_gpu_info = payload_json.get("need_gpu_info", False) if self.mlops_metrics is not None: @@ -2519,7 +2520,7 @@ def fetch_configs(self): return MLOpsConfigs.fetch_all_configs() def send_agent_active_msg(self): - active_topic = "flserver_agent/active" + active_topic = MqttTopics.server_mlops_active() status = MLOpsStatus.get_instance().get_server_agent_status(self.edge_id) if ( status is not None @@ -2556,7 +2557,7 @@ def recover_start_train_msg_after_upgrading(self): current_job.status == ServerConstants.MSG_MLOPS_SERVER_STATUS_UPGRADING: logging.info("start training after upgrading.") server_agent_id = self.edge_id - topic_start_train = "mlops/flserver_agent_" + str(server_agent_id) + "/start_train" + topic_start_train = MqttTopics.mlops_server_start_train(server_id=server_agent_id) self.callback_start_train(topic_start_train, current_job.running_json) except Exception as e: logging.info("recover starting train message after upgrading: {}".format(traceback.format_exc())) @@ -2566,37 +2567,37 @@ def on_agent_mqtt_connected(self, mqtt_client_object): # Setup MQTT message listener for starting training server_agent_id = self.edge_id - topic_start_train = "mlops/flserver_agent_" + str(server_agent_id) + "/start_train" + topic_start_train = MqttTopics.mlops_server_start_train(server_id=server_agent_id) self.add_message_listener(topic_start_train, self.callback_start_train) self.mqtt_mgr.add_message_listener(topic_start_train, self.listener_message_dispatch_center) # Setup MQTT message listener for stopping training - topic_stop_train = "mlops/flserver_agent_" + str(server_agent_id) + "/stop_train" + topic_stop_train = MqttTopics.mlops_server_stop_train(server_id=server_agent_id) self.add_message_listener(topic_stop_train, self.callback_stop_train) self.mqtt_mgr.add_message_listener(topic_stop_train, self.listener_message_dispatch_center) # Setup MQTT message listener for server status switching - topic_server_status = "fl_server/flserver_agent_" + str(server_agent_id) + "/status" + topic_server_status = MqttTopics.server_server_agent_status(server_id=server_agent_id) self.add_message_listener(topic_server_status, self.callback_runner_id_status) self.mqtt_mgr.add_message_listener(topic_server_status, self.listener_message_dispatch_center) # Setup MQTT message listener to report current device status. - topic_report_status = "mlops/report_device_status" + topic_report_status = MqttTopics.mlops_client_report_device_status() self.add_message_listener(topic_report_status, self.callback_report_current_status) self.mqtt_mgr.add_message_listener(topic_report_status, self.listener_message_dispatch_center) # Setup MQTT message listener to OTA messages from the MLOps. - topic_ota_msg = "mlops/flserver_agent_" + str(server_agent_id) + "/ota" + topic_ota_msg = MqttTopics.mlops_server_ota(server_id=server_agent_id) self.add_message_listener(topic_ota_msg, self.callback_server_ota_msg) self.mqtt_mgr.add_message_listener(topic_ota_msg, self.listener_message_dispatch_center) # Setup MQTT message listener to request device info from the client. - topic_response_device_info = "client/server/response_device_info/" + str(self.edge_id) + topic_response_device_info = MqttTopics.client_server_response_device_info(server_id=self.edge_id) self.add_message_listener(topic_response_device_info, self.callback_response_device_info) self.mqtt_mgr.add_message_listener(topic_response_device_info, self.listener_message_dispatch_center) # Setup MQTT message listener to request device info from MLOps. - topic_request_device_info_from_mlops = f"deploy/mlops/master_agent/request_device_info/{self.edge_id}" + topic_request_device_info_from_mlops = MqttTopics.deploy_mlops_master_request_device_info(master_id=self.edge_id) self.add_message_listener(topic_request_device_info_from_mlops, self.callback_request_device_info_from_mlops) self.mqtt_mgr.add_message_listener( topic_request_device_info_from_mlops, self.listener_message_dispatch_center) diff --git a/python/fedml/computing/scheduler/slave/client_diagnosis.py b/python/fedml/computing/scheduler/slave/client_diagnosis.py index 103e7feac4..956780ea1d 100644 --- a/python/fedml/computing/scheduler/slave/client_diagnosis.py +++ b/python/fedml/computing/scheduler/slave/client_diagnosis.py @@ -6,6 +6,7 @@ from threading import Thread import fedml +from fedml.computing.scheduler.comm_utils.mqtt_topics import MqttTopics from fedml.core.distributed.communication.message import Message from fedml.core.distributed.communication.mqtt.mqtt_manager import MqttManager from fedml.core.distributed.communication.mqtt_s3 import MqttS3MultiClientsCommManager @@ -209,7 +210,7 @@ def on_test_mqtt_connected(self, mqtt_client_object): self.is_mqtt_connected = True print("on_test_mqtt_connected") - topic_test_mqtt_msg = "fedml/" + str(self.mqtt_mgr._client_id) + "/test_mqtt_msg" + topic_test_mqtt_msg = MqttTopics.test_mqtt_connection(mqtt_client_id=self.mqtt_mgr._client_id) self.mqtt_mgr.add_message_listener(topic_test_mqtt_msg, self.callback_test_mqtt_msg) mqtt_client_object.subscribe(topic_test_mqtt_msg) @@ -222,7 +223,7 @@ def on_test_mqtt_disconnected(self, mqtt_client_object): print("on_test_mqtt_disconnected") - topic_test_mqtt_msg = "fedml/" + str(self.mqtt_mgr._client_id) + "/test_mqtt_msg" + topic_test_mqtt_msg = MqttTopics.test_mqtt_connection(mqtt_client_id=self.mqtt_mgr._client_id) self.mqtt_mgr.remove_message_listener(topic_test_mqtt_msg) mqtt_client_object.subscribe(topic_test_mqtt_msg) @@ -233,7 +234,7 @@ def callback_test_mqtt_msg(self, topic, payload): def send_test_mqtt_msg(self): while True: - topic_test_mqtt_msg = "fedml/" + str(self.mqtt_mgr._client_id) + "/test_mqtt_msg" + topic_test_mqtt_msg = MqttTopics.test_mqtt_connection(mqtt_client_id=self.mqtt_mgr._client_id) test_mqtt_msg_payload = {"id": self.mqtt_mgr._client_id, "msg": topic_test_mqtt_msg} ret = self.mqtt_mgr.send_message(topic_test_mqtt_msg, json.dumps(test_mqtt_msg_payload)) diff --git a/python/fedml/computing/scheduler/slave/client_login.py b/python/fedml/computing/scheduler/slave/client_login.py index c8123a717c..b0469991e8 100755 --- a/python/fedml/computing/scheduler/slave/client_login.py +++ b/python/fedml/computing/scheduler/slave/client_login.py @@ -15,7 +15,6 @@ from fedml.computing.scheduler.slave.client_runner import FedMLClientRunner from fedml.computing.scheduler.slave.client_constants import ClientConstants from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog -from fedml.core.mlops.mlops_runtime_log_daemon import MLOpsRuntimeLogDaemon def init_logs(args, edge_id): diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 79b5697728..d0386adebb 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -8,19 +8,17 @@ import platform import shutil import subprocess -import threading import time import traceback import urllib import uuid -import zipfile -from urllib.parse import urljoin, urlparse import requests import fedml from ..comm_utils.constants import SchedulerConstants +from ..comm_utils.mqtt_topics import MqttTopics from ..comm_utils.job_cleanup import JobCleanup from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs from ..comm_utils.run_process_utils import RunProcessUtils @@ -129,6 +127,7 @@ def __init__(self, args, edge_id=0, request_json=None, agent_config=None, run_id self.user_name = None self.general_edge_id = None self.message_center = None + self.prev_download_progress = 0 def __repr__(self): return "<{klass} @{id:x} {attrs}>".format( @@ -137,60 +136,6 @@ def __repr__(self): attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()), ) - def copy_runner(self): - copy_runner = FedMLClientRunner(self.args) - copy_runner.disable_client_login = self.disable_client_login - copy_runner.model_device_server = self.model_device_server - copy_runner.model_device_client_list = self.model_device_client_list - copy_runner.run_process_event = self.run_process_event - copy_runner.run_process_event_map = self.run_process_event_map - copy_runner.run_process_completed_event = self.run_process_completed_event - copy_runner.run_process_completed_event_map = self.run_process_completed_event_map - copy_runner.run_process = self.run_process - copy_runner.run_process_map = self.run_process_map - copy_runner.running_request_json = self.running_request_json - copy_runner.local_api_process = self.local_api_process - copy_runner.start_request_json = self.start_request_json - copy_runner.device_status = self.device_status - copy_runner.current_training_status = self.current_training_status - copy_runner.mqtt_mgr = self.mqtt_mgr - copy_runner.edge_id = self.edge_id - copy_runner.edge_user_name = self.edge_user_name - copy_runner.edge_extra_url = self.edge_extra_url - copy_runner.run_id = self.run_id - copy_runner.unique_device_id = self.unique_device_id - copy_runner.args = self.args - copy_runner.request_json = self.request_json - copy_runner.version =self.version - copy_runner.device_id = self.device_id - copy_runner.cur_dir = self.cur_dir - copy_runner.cur_dir = self.cur_dir - copy_runner.sudo_cmd = self.sudo_cmd - copy_runner.is_mac = self.is_mac - - copy_runner.agent_config = self.agent_config - copy_runner.fedml_data_base_package_dir = self.fedml_data_base_package_dir - copy_runner.fedml_data_local_package_dir = self.fedml_data_local_package_dir - copy_runner.fedml_data_dir = self.fedml_data_dir - copy_runner.fedml_config_dir = self.fedml_config_dir - - copy_runner.FEDML_DYNAMIC_CONSTRAIN_VARIABLES = self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES - - copy_runner.mlops_metrics = self.mlops_metrics - copy_runner.client_active_list = self.client_active_list - copy_runner.ntp_offset = self.ntp_offset - copy_runner.server_id = self.server_id - copy_runner.computing_started_time = self.computing_started_time - copy_runner.fedml_config_object = self.fedml_config_object - copy_runner.package_type = self.package_type - copy_runner.cuda_visible_gpu_ids_str = self.cuda_visible_gpu_ids_str - copy_runner.subscribed_topics = self.subscribed_topics - copy_runner.user_name = self.user_name - copy_runner.general_edge_id = self.general_edge_id - copy_runner.message_center = self.message_center - - return copy_runner - def build_dynamic_constrain_variables(self, run_id, run_config): data_config = run_config.get("data_config", {}) server_edge_id_list = self.request_json["edgeids"] @@ -214,8 +159,8 @@ def build_dynamic_constrain_variables(self, run_id, run_config): self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_ID_LIST}"] = str(local_edge_id_list).replace(" ", "") self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.SYNTHETIC_DATA_URL}"] = synthetic_data_url.replace(" ", "") self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.IS_USING_LOCAL_DATA}"] = str(is_using_local_data) - self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_NUM}"] = len(server_edge_id_list) - self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_INDEX}"] = 1 + self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_NUM}"] = str(len(server_edge_id_list)) + self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_INDEX}"] = str(1) for cur_index, id_value in enumerate(server_edge_id_list): if str(id_value) == str(self.edge_id): self.FEDML_DYNAMIC_CONSTRAIN_VARIABLES["${FEDSYS.CLIENT_INDEX}"] = cur_index + 1 @@ -227,16 +172,6 @@ def build_dynamic_constrain_variables(self, run_id, run_config): "LOG_SERVER_URL" ] - def unzip_file(self, zip_file, unzip_file_path) -> str: - if zipfile.is_zipfile(zip_file): - with zipfile.ZipFile(zip_file, "r") as zipf: - zipf.extractall(unzip_file_path) - unzipped_file_name = zipf.namelist()[0] - else: - raise Exception("Invalid zip file {}".format(zip_file)) - - return unzipped_file_name - def package_download_progress(self, count, blksize, filesize): self.check_runner_stop_event() @@ -253,7 +188,7 @@ def package_download_progress(self, count, blksize, filesize): self.prev_download_progress = progress_int logging.info("package downloaded size {} KB, progress {}%".format(downloaded_kb, progress_int)) - def retrieve_and_unzip_package(self, package_name, package_url): + def retrieve_and_unzip_package(self, package_url): local_package_path = ClientConstants.get_package_download_dir() os.makedirs(local_package_path, exist_ok=True) filename, filename_without_extension, file_extension = ClientConstants.get_filename_and_extension(package_url) @@ -272,7 +207,8 @@ def retrieve_and_unzip_package(self, package_name, package_url): f"Failed to remove directory {unzip_package_path}, Exception: {e}, Traceback: {traceback.format_exc()}") pass - package_dir_name = self.unzip_file(local_package_file, unzip_package_path) # Using unziped folder name + # Using unziped folder name as package directory name + package_dir_name = JobRunnerUtils.unzip_file(local_package_file, unzip_package_path) unzip_package_full_path = os.path.join(unzip_package_path, package_dir_name) logging.info("local_package_file {}, unzip_package_path {}, unzip file full path {}".format( @@ -339,17 +275,16 @@ def update_local_fedml_config(self, run_id, run_config): ClientConstants.generate_yaml_doc(package_conf_object, fedml_updated_config_file) # Build dynamic arguments and set arguments to fedml config object - self.build_dynamic_args(run_id, run_config, package_conf_object, unzip_package_path) + self.build_dynamic_args(run_config=run_config, package_conf_object=package_conf_object, + unzip_package_path=unzip_package_path) return unzip_package_path, package_conf_object - def build_dynamic_args(self, run_id, run_config, package_conf_object, base_dir): + def build_dynamic_args(self, run_config, package_conf_object, base_dir): fedml_conf_file = package_conf_object["entry_config"]["conf_file"] fedml_conf_file_processed = str(fedml_conf_file).replace('\\', os.sep).replace('/', os.sep) fedml_conf_path = os.path.join(base_dir, "fedml", "config", os.path.basename(fedml_conf_file_processed)) fedml_conf_object = load_yaml_config(fedml_conf_path) - run_params = run_config.get("parameters", {}) - job_yaml = run_params.get("job_yaml", {}) # Replace local fedml config objects with parameters from MLOps web parameters_object = run_config.get("parameters", None) @@ -985,7 +920,8 @@ def callback_start_train(self, topic, payload): run_id_str = str(run_id) self.running_request_json[run_id_str] = request_json client_runner = FedMLClientRunner( - self.args, edge_id=train_edge_id, request_json=request_json, agent_config=self.agent_config, run_id=run_id, + self.args, edge_id=int(train_edge_id), request_json=request_json, agent_config=self.agent_config, + run_id=run_id, cuda_visible_gpu_ids_str=cuda_visible_gpu_ids_str ) client_runner.start_request_json = payload @@ -1024,7 +960,8 @@ def callback_stop_train(self, topic, payload): # Stop client with multiprocessing mode run_id_str = str(run_id) client_runner = FedMLClientRunner( - self.args, edge_id=train_edge_id, request_json=request_json, agent_config=self.agent_config, run_id=run_id + self.args, edge_id=int(train_edge_id), request_json=request_json, agent_config=self.agent_config, + run_id=run_id ) self.cleanup_containers_and_release_gpus(run_id, train_edge_id) client_runner.run_process_event = self.run_process_event_map.get(run_id_str, None) @@ -1099,7 +1036,7 @@ def callback_runner_id_status(self, topic, payload): # Stop client with multiprocessing mode client_runner = FedMLClientRunner( self.args, - edge_id=edge_id, + edge_id=int(edge_id), request_json=request_json, agent_config=self.agent_config, run_id=run_id, @@ -1148,7 +1085,7 @@ def callback_report_current_status(self, topic, payload): f"FedMLDebug - Receive: topic ({topic}), payload ({payload})" ) - self.send_agent_active_msg() + self.send_agent_active_msg(self.edge_id) if self.general_edge_id is not None: self.send_agent_active_msg(self.general_edge_id) @@ -1238,7 +1175,7 @@ def response_device_info_to_mlops(self, topic, payload): if context is not None: response_payload["context"] = context self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id) - + def callback_report_device_info(self, topic, payload): payload_json = json.loads(payload) server_id = payload_json.get("server_id", 0) @@ -1247,7 +1184,7 @@ def callback_report_device_info(self, topic, payload): context = payload_json.get("context", None) need_gpu_info = payload_json.get("need_gpu_info", False) need_running_process_list = payload_json.get("need_running_process_list", False) - response_topic = f"client/server/response_device_info/{server_id}" + response_topic = MqttTopics.client_server_response_device_info(server_id=server_id) if self.mlops_metrics is not None and self.model_device_client_edge_id_list is not None and \ self.model_device_server_id is not None: if not need_gpu_info: @@ -1504,7 +1441,7 @@ def fetch_configs(self): return MLOpsConfigs.fetch_all_configs() def send_agent_active_msg(self, edge_id): - active_topic = "flclient_agent/active" + active_topic = MqttTopics.client_mlops_active() status = MLOpsStatus.get_instance().get_client_agent_status(edge_id) if ( status is not None @@ -1536,64 +1473,74 @@ def recover_start_train_msg_after_upgrading(self): if current_job is not None and \ current_job.status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_UPGRADING: logging.info("start training after upgrading.") - topic_start_train = "flserver_agent/" + str(self.edge_id) + "/start_train" + topic_start_train = MqttTopics.server_client_start_train(client_id=self.edge_id) self.callback_start_train(topic_start_train, current_job.running_json) except Exception as e: logging.error(f"recover starting train message after upgrading failed with exception {e}, " f"Traceback {traceback.format_exc()}") + # TODO (alaydshah): Cleanup old topics that are commented out def on_agent_mqtt_connected(self, mqtt_client_object): # The MQTT message topic format is as follows: // # Setup MQTT message listener for starting training - topic_start_train = "flserver_agent/" + str(self.edge_id) + "/start_train" + # topic_start_train = "flserver_agent/" + str(self.edge_id) + "/start_train" + topic_start_train = MqttTopics.server_client_start_train(client_id=self.edge_id) self.add_message_listener(topic_start_train, self.callback_start_train) self.mqtt_mgr.add_message_listener(topic_start_train, self.listener_message_dispatch_center) # Setup MQTT message listener for stopping training - topic_stop_train = "flserver_agent/" + str(self.edge_id) + "/stop_train" + topic_stop_train = MqttTopics.server_client_stop_train(client_id=self.edge_id) self.add_message_listener(topic_stop_train, self.callback_stop_train) self.mqtt_mgr.add_message_listener(topic_stop_train, self.listener_message_dispatch_center) - # Setup MQTT message listener for client status switching - topic_client_status = "fl_client/flclient_agent_" + str(self.edge_id) + "/status" + topic_client_status = MqttTopics.client_client_agent_status(client_id=self.edge_id) self.add_message_listener(topic_client_status, self.callback_runner_id_status) self.mqtt_mgr.add_message_listener(topic_client_status, self.listener_message_dispatch_center) # Setup MQTT message listener to report current device status. - topic_report_status = "mlops/report_device_status" + topic_report_status = MqttTopics.mlops_client_report_device_status() self.add_message_listener(topic_report_status, self.callback_report_current_status) self.mqtt_mgr.add_message_listener(topic_report_status, self.listener_message_dispatch_center) # Setup MQTT message listener to OTA messages from the MLOps. - topic_ota_msg = "mlops/flclient_agent_" + str(self.edge_id) + "/ota" + topic_ota_msg = MqttTopics.mlops_client_ota(client_id=self.edge_id) self.add_message_listener(topic_ota_msg, self.callback_client_ota_msg) self.mqtt_mgr.add_message_listener(topic_ota_msg, self.listener_message_dispatch_center) # Setup MQTT message listener to OTA messages from the MLOps. - topic_request_device_info = "server/client/request_device_info/" + str(self.edge_id) + topic_request_device_info = MqttTopics.server_client_request_device_info(client_id=self.edge_id) self.add_message_listener(topic_request_device_info, self.callback_report_device_info) self.mqtt_mgr.add_message_listener(topic_request_device_info, self.listener_message_dispatch_center) - topic_request_edge_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.edge_id}" + topic_request_edge_device_info_from_mlops = MqttTopics.deploy_mlops_slave_request_device_info( + slave_id=self.edge_id) self.add_message_listener(topic_request_edge_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, self.listener_message_dispatch_center) + self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_master_device_info_from_mlops = None if self.model_device_server_id is not None: - topic_request_deploy_master_device_info_from_mlops = f"deploy/mlops/master_agent/request_device_info/{self.model_device_server_id}" - self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.listener_message_dispatch_center) + # self.model_device_server_id}" + topic_request_deploy_master_device_info_from_mlops = (MqttTopics.deploy_mlops_master_request_device_info + (master_id=self.model_device_server_id)) + self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_slave_device_info_from_mlops = None if self.model_device_client_edge_id_list is not None and len(self.model_device_client_edge_id_list) > 0: - topic_request_deploy_slave_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.model_device_client_edge_id_list[0]}" - self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.listener_message_dispatch_center) - + topic_request_deploy_slave_device_info_from_mlops = (MqttTopics.deploy_mlops_slave_request_device_info + (slave_id=self.model_device_client_edge_id_list[0])) + self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.listener_message_dispatch_center) + # Setup MQTT message listener to logout from MLOps. - topic_client_logout = "mlops/client/logout/" + str(self.edge_id) + topic_client_logout = MqttTopics.mlops_client_logout(client_id=self.edge_id) self.add_message_listener(topic_client_logout, self.callback_client_logout) self.mqtt_mgr.add_message_listener(topic_client_logout, self.listener_message_dispatch_center) @@ -1660,26 +1607,28 @@ def subscribe_fl_msgs(self): return # Setup MQTT message listener for starting training - topic_start_train = "flserver_agent/" + str(self.general_edge_id) + "/start_train" + topic_start_train = MqttTopics.server_client_start_train(client_id=self.general_edge_id) self.add_message_listener(topic_start_train, self.callback_start_train) self.mqtt_mgr.add_message_listener(topic_start_train, self.listener_message_dispatch_center) # Setup MQTT message listener for stopping training - topic_stop_train = "flserver_agent/" + str(self.general_edge_id) + "/stop_train" + topic_stop_train = MqttTopics.server_client_stop_train(client_id=self.general_edge_id) self.add_message_listener(topic_stop_train, self.callback_stop_train) self.mqtt_mgr.add_message_listener(topic_stop_train, self.listener_message_dispatch_center) # Setup MQTT message listener for client status switching - topic_client_status = "fl_client/flclient_agent_" + str(self.general_edge_id) + "/status" + topic_client_status = MqttTopics.client_client_agent_status(client_id=self.general_edge_id) self.add_message_listener(topic_client_status, self.callback_runner_id_status) self.mqtt_mgr.add_message_listener(topic_client_status, self.listener_message_dispatch_center) # Setup MQTT message listener to OTA messages from the MLOps. - topic_request_device_info = "server/client/request_device_info/" + str(self.general_edge_id) + topic_request_device_info = MqttTopics.server_client_request_device_info(client_id=self.general_edge_id) self.add_message_listener(topic_request_device_info, self.callback_report_device_info) self.mqtt_mgr.add_message_listener(topic_request_device_info, self.listener_message_dispatch_center) - topic_request_device_info_from_mlops = f"deploy/mlops/client_agent/request_device_info/{self.general_edge_id}" + # topic_request_device_info_from_mlops = f"deploy/mlops/client_agent/request_device_info/{self.general_edge_id}" + topic_request_device_info_from_mlops = (MqttTopics.deploy_mlops_client_request_device_info + (client_id=self.general_edge_id)) self.add_message_listener(topic_request_device_info_from_mlops, self.response_device_info_to_mlops) self.mqtt_mgr.add_message_listener(topic_request_device_info_from_mlops, self.listener_message_dispatch_center) @@ -1711,7 +1660,7 @@ def setup_agent_mqtt_connection(self, service_config): service_config["mqtt_config"]["MQTT_PWD"], service_config["mqtt_config"]["MQTT_KEEPALIVE"], f"FedML_ClientAgent_Daemon_@{self.user_name}@_@{self.args.current_device_id}@_@{str(uuid.uuid4())}@", - "flclient_agent/last_will_msg", + MqttTopics.last_will_message(), json.dumps({"ID": self.edge_id, "status": ClientConstants.MSG_MLOPS_CLIENT_STATUS_OFFLINE}) ) self.agent_config = service_config diff --git a/python/fedml/core/mlops/__init__.py b/python/fedml/core/mlops/__init__.py index 77ad06165e..4fa8a9b04f 100644 --- a/python/fedml/core/mlops/__init__.py +++ b/python/fedml/core/mlops/__init__.py @@ -16,7 +16,6 @@ from fedml import constants from fedml.computing.scheduler.comm_utils import sys_utils from fedml.core.mlops.mlops_configs import MLOpsConfigs -from .mlops_constants import MLOpsConstants from .mlops_metrics import MLOpsMetrics from .mlops_profiler_event import MLOpsProfilerEvent from .mlops_runtime_log import MLOpsRuntimeLog @@ -27,6 +26,7 @@ from .system_stats import SysStats from ..distributed.communication.mqtt.mqtt_manager import MqttManager from ..distributed.communication.s3.remote_storage import S3Storage +from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics from ...computing.scheduler.master.server_constants import ServerConstants from ...computing.scheduler.master.server_runner import FedMLServerRunner from ...computing.scheduler.slave.client_constants import ClientConstants @@ -521,8 +521,8 @@ def register_run_status_callback(run_status_callback): MLOpsStore.mlops_run_status_callback = run_status_callback - topic_client_status = "fl_client/flclient_agent_" + str(MLOpsStore.mlops_edge_id) + "/status" - topic_server_status = "fl_server/flserver_agent_" + str(MLOpsStore.mlops_edge_id) + "/status" + topic_client_status = MqttTopics.client_client_agent_status(MLOpsStore.mlops_edge_id) + topic_server_status = MqttTopics.server_server_agent_status(MLOpsStore.mlops_edge_id) MLOpsStore.mlops_log_mqtt_mgr.add_message_listener(topic_client_status, callback_run_status_changed) MLOpsStore.mlops_log_mqtt_mgr.add_message_listener(topic_server_status, callback_run_status_changed) MLOpsStore.mlops_log_mqtt_mgr.subscribe_msg(topic_client_status) @@ -1444,7 +1444,7 @@ def release_resources(run_id, device_id): payload = {"run_id": run_id, "device_id": device_id, "gpu_count": 0} MLOpsStore.mlops_log_mqtt_mgr.send_message_json( - MLOpsConstants.MSG_TOPIC_LAUNCH_RELEASE_GPU_IDS, json.dumps(payload)) + MqttTopics.launch_mlops_release_gpu_ids(), json.dumps(payload)) def sync_deploy_id(device_id, master_deploy_id, worker_deploy_id_list): @@ -1454,5 +1454,4 @@ def sync_deploy_id(device_id, master_deploy_id, worker_deploy_id_list): payload = {"device_id": device_id, "master_deploy_id": master_deploy_id, "worker_deploy_ids": worker_deploy_id_list} MLOpsStore.mlops_log_mqtt_mgr.send_message_json( - MLOpsConstants.MSG_TOPIC_LAUNCH_SYNC_DEPLOY_IDS, json.dumps(payload)) - + MqttTopics.launch_mlops_sync_deploy_ids(), json.dumps(payload)) diff --git a/python/fedml/core/mlops/device_info_report_protocol.py b/python/fedml/core/mlops/device_info_report_protocol.py index 8c5c9c3786..7c9e1a125f 100755 --- a/python/fedml/core/mlops/device_info_report_protocol.py +++ b/python/fedml/core/mlops/device_info_report_protocol.py @@ -5,6 +5,7 @@ import time import uuid +from fedml.computing.scheduler.comm_utils.mqtt_topics import MqttTopics from fedml.core.distributed.communication.mqtt.mqtt_manager import MqttManager @@ -48,12 +49,12 @@ def get_device_info(self, run_id, edge_id, server_id): return None def setup_listener_for_device_info_response(self, server_id): - response_topic = f"client/server/response_device_info/{server_id}" + response_topic = MqttTopics.client_server_response_device_info(server_id=server_id) self.client_mqtt_mgr.add_message_listener(response_topic, self.callback_device_info_response) self.client_mqtt_mgr.subscribe_msg(response_topic) def remove_listener_for_device_info_response(self, server_id): - response_topic = f"client/server/response_device_info/{server_id}" + response_topic = MqttTopics.client_server_response_device_info(server_id=server_id) self.client_mqtt_mgr.remove_message_listener(response_topic) self.client_mqtt_mgr.unsubscribe_msg(response_topic) @@ -128,6 +129,6 @@ def release_client_mqtt_mgr(self): pass def request_device_info(self, run_id, edge_id, server_id): - topic_request_device_info = "server/client/request_device_info/" + str(edge_id) + topic_request_device_info = MqttTopics.server_client_request_device_info(client_id=edge_id) payload = {"server_id": server_id, "run_id": run_id, "need_running_process_list": True} - self.client_mqtt_mgr.send_message(topic_request_device_info, json.dumps(payload)) \ No newline at end of file + self.client_mqtt_mgr.send_message(topic_request_device_info, json.dumps(payload)) diff --git a/python/fedml/core/mlops/mlops_constants.py b/python/fedml/core/mlops/mlops_constants.py deleted file mode 100755 index a626941842..0000000000 --- a/python/fedml/core/mlops/mlops_constants.py +++ /dev/null @@ -1,10 +0,0 @@ - - -class MLOpsConstants: - - MSG_TOPIC_LAUNCH_RELEASE_GPU_IDS = "launch_device/mlops/release_gpu_ids" - MSG_TOPIC_LAUNCH_SYNC_DEPLOY_IDS = "launch_device/mlops/sync_deploy_ids" - - - - diff --git a/python/fedml/core/mlops/mlops_device_perfs.py b/python/fedml/core/mlops/mlops_device_perfs.py index 03d52bf993..2c510d9722 100644 --- a/python/fedml/core/mlops/mlops_device_perfs.py +++ b/python/fedml/core/mlops/mlops_device_perfs.py @@ -14,6 +14,7 @@ from .mlops_utils import MLOpsUtils from .system_stats import SysStats from ...computing.scheduler.comm_utils.job_monitor import JobMonitor +from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics from ...core.distributed.communication.mqtt.mqtt_manager import MqttManager ROLE_DEVICE_INFO_REPORTER = 1 @@ -163,7 +164,7 @@ def report_gpu_device_info(edge_id, mqtt_mgr=None): total_mem, free_mem, total_disk_size, free_disk_size, cup_utilization, cpu_cores, gpu_cores_total, \ gpu_cores_available, sent_bytes, recv_bytes, gpu_available_ids = sys_utils.get_sys_realtime_stats() - topic_name = "ml_client/mlops/gpu_device_info" + topic_name = MqttTopics.client_mlops_gpu_device_info() # We should report realtime available gpu count to MLOps, not from local redis cache. # Use gpu_available_ids from sys_utils.get_sys_realtime_stats() diff --git a/python/fedml/core/mlops/mlops_job_perfs.py b/python/fedml/core/mlops/mlops_job_perfs.py index fe3d921558..5005b306f9 100644 --- a/python/fedml/core/mlops/mlops_job_perfs.py +++ b/python/fedml/core/mlops/mlops_job_perfs.py @@ -10,6 +10,7 @@ from .mlops_utils import MLOpsUtils from .system_stats import SysStats +from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics from ...core.distributed.communication.mqtt.mqtt_manager import MqttManager @@ -38,7 +39,7 @@ def report_system_metric(run_id, edge_id, metric_json=None, if run_id_str == "0" or run_id_str == "": return - topic_name = "fl_client/mlops/system_performance" + topic_name = MqttTopics.client_mlops_system_performance() if metric_json is None: if sys_stats_obj is None: sys_stats_obj = SysStats(process_id=os.getpid()) diff --git a/python/fedml/core/mlops/mlops_metrics.py b/python/fedml/core/mlops/mlops_metrics.py index b24025c8f9..52c83c832f 100644 --- a/python/fedml/core/mlops/mlops_metrics.py +++ b/python/fedml/core/mlops/mlops_metrics.py @@ -12,6 +12,7 @@ from .mlops_job_perfs import MLOpsJobPerfStats from ...computing.scheduler.master.server_constants import ServerConstants from ...computing.scheduler.slave.client_constants import ClientConstants +from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics from ...core.mlops.mlops_status import MLOpsStatus @@ -89,7 +90,7 @@ def report_client_device_status_to_web_ui(self, edge_id, status, run_id=0): if status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_IDLE: return - topic_name = "fl_client/mlops/status" + topic_name = MqttTopics.client_mlops_status() msg = {"edge_id": edge_id, "run_id": run_id, "status": status, "version": "v1.0"} message_json = json.dumps(msg) logging.info("report_client_device_status. message_json = %s" % message_json) @@ -104,7 +105,7 @@ def common_report_client_training_status(self, edge_id, status, run_id=0): this is used for notifying the client status to MLOps (both FedML CLI and backend can consume it) Currently, the INITIALIZING, and RUNNING are report using this method. """ - topic_name = "fl_run/fl_client/mlops/status" + topic_name = MqttTopics.run_client_mlops_status() msg = {"edge_id": edge_id, "run_id": run_id, "status": status} message_json = json.dumps(msg) logging.info("report_client_training_status. message_json = %s" % message_json) @@ -133,14 +134,14 @@ def common_broadcast_client_training_status(self, edge_id, status, run_id=0): this is used for broadcasting the client status to MLOps (backend can consume it) Currently, the FINISHED are report using this method. """ - topic_name = "fl_run/fl_client/mlops/status" + topic_name = MqttTopics.run_client_mlops_status() msg = {"edge_id": edge_id, "run_id": run_id, "status": status} message_json = json.dumps(msg) logging.info("broadcast_client_training_status. message_json = %s" % message_json) self.messenger.send_message_json(topic_name, message_json) def client_send_exit_train_msg(self, run_id, edge_id, status, msg=None): - topic_exit_train_with_exception = "flserver_agent/" + str(run_id) + "/client_exit_train_with_exception" + topic_exit_train_with_exception = MqttTopics.server_run_exception(run_id) msg = {"run_id": run_id, "edge_id": edge_id, "status": status, "msg": msg if msg is not None else ""} message_json = json.dumps(msg) logging.info("client_send_exit_train_msg.") @@ -168,7 +169,7 @@ def common_report_client_id_status(self, run_id, edge_id, status, server_id="0", """ this is used for communication between client agent (FedML cli module) and client """ - topic_name = "fl_client/flclient_agent_" + str(edge_id) + "/status" + topic_name = MqttTopics.client_client_agent_status(client_id=edge_id) msg = {"run_id": run_id, "edge_id": edge_id, "status": status, "server_id": server_id, "msg": msg} message_json = json.dumps(msg) # logging.info("report_client_id_status. message_json = %s" % message_json) @@ -186,6 +187,7 @@ def report_server_training_status(self, run_id, status, edge_id=0, role=None, ru from ...computing.scheduler.master.server_data_interface import FedMLServerDataInterface FedMLServerDataInterface.get_instance().save_job(run_id, self.edge_id, status, running_json) + # TODO (alaydshah): This method and the topic both seem to be deprecated. Confirm and remove. def report_server_device_status_to_web_ui(self, run_id, status, edge_id=0, role=None): """ this is used for notifying the server device status to MLOps Frontend @@ -193,7 +195,7 @@ def report_server_device_status_to_web_ui(self, run_id, status, edge_id=0, role= if status == ServerConstants.MSG_MLOPS_DEVICE_STATUS_IDLE: return - topic_name = "fl_server/mlops/status" + topic_name = MqttTopics.server_mlops_status() if role is None: role = "normal" msg = { @@ -211,7 +213,7 @@ def report_server_device_status_to_web_ui(self, run_id, status, edge_id=0, role= def common_report_server_training_status(self, run_id, status, role=None, edge_id=0): # if not self.comm_sanity_check(): # return - topic_name = "fl_run/fl_server/mlops/status" + topic_name = MqttTopics.run_server_mlops_status() if role is None: role = "normal" msg = { @@ -228,7 +230,7 @@ def common_report_server_training_status(self, run_id, status, role=None, edge_i def broadcast_server_training_status(self, run_id, status, role=None, is_from_model=False, edge_id=None): if self.messenger is None: return - topic_name = "fl_run/fl_server/mlops/status" + topic_name = MqttTopics.run_server_mlops_status() if role is None: role = "normal" msg = { @@ -251,8 +253,7 @@ def broadcast_server_training_status(self, run_id, status, role=None, is_from_mo def report_server_id_status(self, run_id, status, edge_id=None, server_id=None, server_agent_id=None): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/flserver_agent_" + str(server_agent_id if server_agent_id is not None else - self.server_agent_id) + "/status" + topic_name = MqttTopics.server_server_agent_status(server_id=server_id) msg = {"run_id": run_id, "edge_id": edge_id if edge_id is not None else self.edge_id, "status": status} if server_id is not None: msg["server_id"] = server_id @@ -262,10 +263,11 @@ def report_server_id_status(self, run_id, status, edge_id=None, server_id=None, # logging.info("report_server_id_status. message_json = %s" % message_json) self.messenger.send_message_json(topic_name, message_json) + # TODO(alaydshah): This method and the topic both seem to be deprecated. Confirm and remove. def report_client_training_metric(self, metric_json): # if not self.comm_sanity_check(): # return - topic_name = "fl_client/mlops/training_metrics" + topic_name = MqttTopics.client_mlops_training_metrics() logging.info("report_client_training_metric. message_json = %s" % metric_json) message_json = json.dumps(metric_json) self.messenger.send_message_json(topic_name, message_json) @@ -273,7 +275,7 @@ def report_client_training_metric(self, metric_json): def report_server_training_metric(self, metric_json, payload=None): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/training_progress_and_eval" + topic_name = MqttTopics.server_mlops_training_progress() if payload is not None: message_json = payload else: @@ -284,7 +286,7 @@ def report_server_training_metric(self, metric_json, payload=None): def report_endpoint_metric(self, metric_json, payload=None): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/deploy_progress_and_eval" + topic_name = MqttTopics.server_mlops_deploy_progress() if payload is not None: message_json = payload else: @@ -295,7 +297,7 @@ def report_endpoint_metric(self, metric_json, payload=None): def report_fedml_train_metric(self, metric_json, run_id=0, is_endpoint=False): # if not self.comm_sanity_check(): # return - topic_name = f"fedml_slave/fedml_master/metrics/{run_id}" + topic_name = MqttTopics.client_server_metrics(run_id = run_id) logging.info("report_fedml_train_metric. message_json = %s" % metric_json) metric_json["is_endpoint"] = is_endpoint message_json = json.dumps(metric_json) @@ -304,42 +306,42 @@ def report_fedml_train_metric(self, metric_json, run_id=0, is_endpoint=False): def report_fedml_run_logs(self, logs_json, run_id=0): # if not self.comm_sanity_check(): # return - topic_name = f"fedml_slave/fedml_master/logs/{run_id}" + topic_name = MqttTopics.client_server_logs(run_id=run_id) message_json = json.dumps(logs_json) self.messenger.send_message_json(topic_name, message_json) def report_server_training_round_info(self, round_info): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/training_roundx" + topic_name = MqttTopics.server_mlops_training_rounds() message_json = json.dumps(round_info) self.messenger.send_message_json(topic_name, message_json) def report_client_model_info(self, model_info_json): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/client_model" + topic_name = MqttTopics.server_mlops_client_model() message_json = json.dumps(model_info_json) self.messenger.send_message_json(topic_name, message_json) def report_aggregated_model_info(self, model_info_json): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/global_aggregated_model" + topic_name = MqttTopics.server_mlops_aggregated_model() message_json = json.dumps(model_info_json) self.messenger.send_message_json(topic_name, message_json) def report_training_model_net_info(self, model_net_info_json): # if not self.comm_sanity_check(): # return - topic_name = "fl_server/mlops/training_model_net" + topic_name = MqttTopics.server_mlops_training_model_net() message_json = json.dumps(model_net_info_json) self.messenger.send_message_json(topic_name, message_json) def report_llm_record(self, metric_json): # if not self.comm_sanity_check(): # return - topic_name = "model_serving/mlops/llm_input_output_record" + topic_name = MqttTopics.model_serving_mlops_llm_input_output_record() logging.info("report_llm_record. message_json = %s" % metric_json) message_json = json.dumps(metric_json) self.messenger.send_message_json(topic_name, message_json) @@ -350,7 +352,7 @@ def report_edge_job_computing_cost(self, job_id, edge_id, """ this is used for reporting the computing cost of a job running on an edge to MLOps """ - topic_name = "ml_client/mlops/job_computing_cost" + topic_name = MqttTopics.client_mlops_job_cost() duration = computing_ended_time - computing_started_time if duration < 0: duration = 0 @@ -365,7 +367,7 @@ def report_edge_job_computing_cost(self, job_id, edge_id, def report_logs_updated(self, run_id): # if not self.comm_sanity_check(): # return - topic_name = "mlops/runtime_logs/" + str(run_id) + topic_name = MqttTopics.mlops_runtime_logs_run(run_id=run_id) msg = {"time": time.time()} message_json = json.dumps(msg) logging.info("report_logs_updated. message_json = %s" % message_json) @@ -375,7 +377,7 @@ def report_artifact_info(self, job_id, edge_id, artifact_name, artifact_type, artifact_local_path, artifact_url, artifact_ext_info, artifact_desc, timestamp): - topic_name = "launch_device/mlops/artifacts" + topic_name = MqttTopics.launch_mlops_artifacts() artifact_info_json = { "job_id": job_id, "edge_id": edge_id, @@ -392,7 +394,7 @@ def report_artifact_info(self, job_id, edge_id, artifact_name, artifact_type, def report_endpoint_status(self, end_point_id, model_status, timestamp=None, end_point_name="", model_name="", model_inference_url=""): - deployment_status_topic_prefix = "model_ops/model_device/return_deployment_status" + deployment_status_topic_prefix = MqttTopics.deploy_mlops_status() deployment_status_topic = "{}/{}".format(deployment_status_topic_prefix, end_point_id) time_param = time.time_ns() / 1000.0 if timestamp is None else timestamp deployment_status_payload = {"end_point_id": end_point_id, "end_point_name": end_point_name,