From f0d35d522831ef69969c2aee402e13104e977886 Mon Sep 17 00:00:00 2001 From: sunxilin Date: Wed, 14 May 2025 21:29:00 +0800 Subject: [PATCH] fix: fill app uri in graph json when handling start_graph cmd --- .../app/msg_interface/start_graph.c | 22 ++ .../extension/extension_info/extension_info.c | 4 +- .../start_graph_with_msg_conversion.cc | 261 ++++++++++++++++++ 3 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 tests/ten_runtime/smoke/start_graph/start_graph_with_msg_conversion.cc diff --git a/core/src/ten_runtime/app/msg_interface/start_graph.c b/core/src/ten_runtime/app/msg_interface/start_graph.c index 14a4a84a9..f5498a96b 100644 --- a/core/src/ten_runtime/app/msg_interface/start_graph.c +++ b/core/src/ten_runtime/app/msg_interface/start_graph.c @@ -22,6 +22,8 @@ #include "include_internal/ten_runtime/engine/engine.h" #include "include_internal/ten_runtime/engine/internal/migration.h" #include "include_internal/ten_runtime/engine/msg_interface/common.h" +#include "include_internal/ten_runtime/extension/extension_info/extension_info.h" +#include "include_internal/ten_runtime/extension_group/extension_group_info/extension_group_info.h" #include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.h" #include "include_internal/ten_runtime/msg/msg.h" #include "include_internal/ten_runtime/protocol/protocol.h" @@ -59,6 +61,23 @@ static bool ten_app_fill_start_graph_cmd_extensions_info_from_predefined_graph( return true; } +void ten_app_fill_start_graph_cmd_node_app_uri(ten_app_t *self, + ten_shared_ptr_t *cmd) { + TEN_ASSERT(self && ten_app_check_integrity(self, true), "Should not happen."); + TEN_ASSERT(cmd && ten_cmd_base_check_integrity(cmd), "Should not happen."); + TEN_ASSERT(ten_msg_get_type(cmd) == TEN_MSG_TYPE_CMD_START_GRAPH, + "Should not happen."); + + ten_list_t *extensions_info = ten_cmd_start_graph_get_extensions_info(cmd); + ten_list_t *extension_groups_info = + ten_cmd_start_graph_get_extension_groups_info(cmd); + + ten_extensions_info_fill_app_uri(extensions_info, + ten_string_get_raw_str(&self->uri)); + ten_extension_groups_info_fill_app_uri(extension_groups_info, + ten_string_get_raw_str(&self->uri)); +} + static bool ten_app_check_start_graph_cmd_from_connection( ten_app_t *self, ten_connection_t *connection, ten_shared_ptr_t *cmd, ten_error_t *err) { @@ -98,6 +117,9 @@ bool ten_app_handle_start_graph_cmd(ten_app_t *self, return false; } + // Fill the app uri of the nodes in the start_graph cmd. + ten_app_fill_start_graph_cmd_node_app_uri(self, cmd); + ten_engine_t *engine = ten_app_get_engine_based_on_dest_graph_id_from_msg(self, cmd); if (engine == NULL) { diff --git a/core/src/ten_runtime/extension/extension_info/extension_info.c b/core/src/ten_runtime/extension/extension_info/extension_info.c index 65e3f21c9..6faa38561 100644 --- a/core/src/ten_runtime/extension/extension_info/extension_info.c +++ b/core/src/ten_runtime/extension/extension_info/extension_info.c @@ -424,7 +424,7 @@ ten_extension_info_t *ten_extension_info_from_smart_ptr( static void ten_extension_info_fill_app_uri(ten_extension_info_t *self, const char *app_uri) { - TEN_ASSERT(self && ten_extension_info_check_integrity(self, true), + TEN_ASSERT(self && ten_extension_info_check_integrity(self, false), "Invalid argument."); TEN_ASSERT(app_uri, "Should not happen."); TEN_ASSERT(!ten_loc_is_empty(&self->loc), "Should not happen."); @@ -456,7 +456,7 @@ void ten_extensions_info_fill_app_uri(ten_list_t *extensions_info, ten_extension_info_t *extension_info = ten_shared_ptr_get_data(ten_smart_ptr_listnode_get(iter.node)); TEN_ASSERT(extension_info && - ten_extension_info_check_integrity(extension_info, true), + ten_extension_info_check_integrity(extension_info, false), "Invalid argument."); ten_extension_info_fill_app_uri(extension_info, app_uri); diff --git a/tests/ten_runtime/smoke/start_graph/start_graph_with_msg_conversion.cc b/tests/ten_runtime/smoke/start_graph/start_graph_with_msg_conversion.cc new file mode 100644 index 000000000..4bb66e8a8 --- /dev/null +++ b/tests/ten_runtime/smoke/start_graph/start_graph_with_msg_conversion.cc @@ -0,0 +1,261 @@ +// +// Copyright © 2025 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +#include "gtest/gtest.h" +#include "include_internal/ten_runtime/binding/cpp/ten.h" +#include "tests/common/client/cpp/msgpack_tcp.h" +#include "tests/ten_runtime/smoke/util/binding/cpp/check.h" + +namespace { + +class test_normal_extension_1 : public ten::extension_t { + public: + explicit test_normal_extension_1(const char *name) : ten::extension_t(name) {} + + void on_start(ten::ten_env_t &ten_env) override { + ten_env.send_cmd(ten::cmd_t::create("A"), + [this](ten::ten_env_t &ten_env, + std::unique_ptr cmd_result, + ten::error_t *err) { ten_env.on_start_done(); }); + } +}; + +class test_normal_extension_2 : public ten::extension_t { + public: + explicit test_normal_extension_2(const char *name) : ten::extension_t(name) {} + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (cmd->get_name() == "B") { + ten_env.return_result(ten::cmd_result_t::create(TEN_STATUS_CODE_OK), + std::move(cmd)); + } else { + auto cmd_name = cmd->get_name(); + auto error_msg = + "test_normal_extension_2 received unexpected cmd: " + cmd_name; + + TEN_ENV_LOG_ERROR(ten_env, error_msg.c_str()); + TEN_ASSERT(0, "Should not happen."); + } + } +}; + +class test_normal_extension_3 : public ten::extension_t { + public: + explicit test_normal_extension_3(const char *name) : ten::extension_t(name) {} + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (cmd->get_name() == "B") { + ten_env.return_result(ten::cmd_result_t::create(TEN_STATUS_CODE_OK), + std::move(cmd)); + } else { + auto cmd_name = cmd->get_name(); + auto error_msg = + "test_normal_extension_3 received unexpected cmd: " + cmd_name; + + TEN_ENV_LOG_ERROR(ten_env, error_msg.c_str()); + TEN_ASSERT(0, "Should not happen."); + } + } +}; + +class test_predefined_graph : public ten::extension_t { + public: + explicit test_predefined_graph(const char *name) : ten::extension_t(name) {} + + void on_start(ten::ten_env_t &ten_env) override { + auto start_graph_cmd = ten::cmd_start_graph_t::create(); + start_graph_cmd->set_dest("localhost", nullptr, nullptr, nullptr); + start_graph_cmd->set_graph_from_json( + R"({ + "nodes": [ + { + "addon": "test_normal_extension_1", + "extension_group": "default", + "name": "test_normal_extension_1", + "type": "extension" + }, + { + "addon": "test_normal_extension_2", + "extension_group": "test_normal_extension_2", + "name": "test_normal_extension_2", + "type": "extension" + } + ], + "connections": [ + { + "extension": "test_normal_extension_1", + "cmd": [ + { + "name": "A", + "dest": [ + { + "extension": "test_normal_extension_2", + "msg_conversion": { + "keep_original": true, + "rules": [ + { + "conversion_mode": "fixed_value", + "path": "_ten.name", + "value": "B" + } + ], + "type": "per_property" + } + } + ] + } + ] + } + ] + })"_json.dump() + .c_str()); + + ten_env.send_cmd( + std::move(start_graph_cmd), + [this](ten::ten_env_t &ten_env, + std::unique_ptr cmd_result, + ten::error_t *err) { + // result for the 'start_graph' command + auto graph_id = cmd_result->get_property_string("detail"); + + // Shut down the graph; otherwise, the app won't be able to close + // because there is still a running engine/graph. + auto stop_graph_cmd = ten::cmd_stop_graph_t::create(); + stop_graph_cmd->set_dest("localhost", nullptr, nullptr, nullptr); + stop_graph_cmd->set_graph_id(graph_id.c_str()); + + ten_env.send_cmd( + std::move(stop_graph_cmd), + [this](ten::ten_env_t &ten_env, + std::unique_ptr cmd_result, + ten::error_t *err) { + start_graph_cmd_is_done = true; + + if (test_cmd != nullptr) { + nlohmann::json detail = {{"id", 1}, {"name", "a"}}; + + auto cmd_result_for_test = + ten::cmd_result_t::create(TEN_STATUS_CODE_OK); + cmd_result_for_test->set_property_from_json( + "detail", detail.dump().c_str()); + ten_env.return_result(std::move(cmd_result_for_test), + std::move(test_cmd)); + } + }); + }); + + ten_env.on_start_done(); + } + + void on_cmd(ten::ten_env_t &ten_env, + std::unique_ptr cmd) override { + if (cmd->get_name() == "test") { + if (start_graph_cmd_is_done) { + nlohmann::json detail = {{"id", 1}, {"name", "a"}}; + + auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK); + cmd_result->set_property_from_json("detail", detail.dump().c_str()); + ten_env.return_result(std::move(cmd_result), std::move(cmd)); + } else { + test_cmd = std::move(cmd); + return; + } + } else { + TEN_ASSERT(0, "Should not happen."); + } + } + + private: + bool start_graph_cmd_is_done{}; + std::unique_ptr test_cmd; +}; + +class test_app : public ten::app_t { + public: + void on_configure(ten::ten_env_t &ten_env) override { + bool rc = ten::ten_env_internal_accessor_t::init_manifest_from_json( + ten_env, + // clang-format off + R"({ + "type": "app", + "name": "test_app", + "version": "0.1.0" + })" + // clang-format on + ); + ASSERT_EQ(rc, true); + + rc = ten_env.init_property_from_json( + // clang-format off + R"({ + "_ten": { + "addon": { + "preload_all": true + }, + "uri": "msgpack://127.0.0.1:8001/", + "log_level": 2, + "predefined_graphs": [{ + "name": "default", + "auto_start": false, + "singleton": true, + "nodes": [{ + "type": "extension", + "name": "predefined_graph", + "addon": "start_graph_with_msg_conversion__predefined_graph_extension", + "extension_group": "start_graph_with_msg_conversion__predefined_graph_group" + }] + }] + } + })" + // clang-format on + ); + ASSERT_EQ(rc, true); + + ten_env.on_configure_done(); + } +}; + +void *app_thread_main(TEN_UNUSED void *args) { + auto *app = new test_app(); + app->run(); + delete app; + + return nullptr; +} + +TEN_CPP_REGISTER_ADDON_AS_EXTENSION( + start_graph_with_msg_conversion__predefined_graph_extension, + test_predefined_graph); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(test_normal_extension_1, + test_normal_extension_1); +TEN_CPP_REGISTER_ADDON_AS_EXTENSION(test_normal_extension_2, + test_normal_extension_3); + +} // namespace + +TEST(StartGraphTest, StartGraphWithMsgConversion) { // NOLINT + auto *app_thread = ten_thread_create("app thread", app_thread_main, nullptr); + + // Create a client and connect to the app. + auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/"); + + // Do not need to send 'start_graph' command first. + // The 'graph_id' MUST be "default" (a special string) if we want to send the + // request to predefined graph. + auto test_cmd = ten::cmd_t::create("test"); + test_cmd->set_dest("msgpack://127.0.0.1:8001/", "default", + "start_graph_with_msg_conversion__predefined_graph_group", + "predefined_graph"); + auto cmd_result = client->send_cmd_and_recv_result(std::move(test_cmd)); + ten_test::check_status_code(cmd_result, TEN_STATUS_CODE_OK); + ten_test::check_detail_with_json(cmd_result, R"({"id": 1, "name": "a"})"); + + delete client; + + ten_thread_join(app_thread, -1); +}