Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/ten_runtime/app/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "include_internal/ten_runtime/engine/engine.h"
#include "include_internal/ten_runtime/engine/internal/migration.h"
#include "include_internal/ten_runtime/engine/msg_interface/common.h"
#include "include_internal/ten_runtime/extension/extension_info/extension_info.h"
#include "include_internal/ten_runtime/extension_group/extension_group_info/extension_group_info.h"
#include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.h"
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_runtime/protocol/protocol.h"
Expand Down Expand Up @@ -59,6 +61,23 @@ static bool ten_app_fill_start_graph_cmd_extensions_info_from_predefined_graph(
return true;
}

void ten_app_fill_start_graph_cmd_node_app_uri(ten_app_t *self,
ten_shared_ptr_t *cmd) {
TEN_ASSERT(self && ten_app_check_integrity(self, true), "Should not happen.");
TEN_ASSERT(cmd && ten_cmd_base_check_integrity(cmd), "Should not happen.");
TEN_ASSERT(ten_msg_get_type(cmd) == TEN_MSG_TYPE_CMD_START_GRAPH,
"Should not happen.");

ten_list_t *extensions_info = ten_cmd_start_graph_get_extensions_info(cmd);
ten_list_t *extension_groups_info =
ten_cmd_start_graph_get_extension_groups_info(cmd);

ten_extensions_info_fill_app_uri(extensions_info,
ten_string_get_raw_str(&self->uri));
ten_extension_groups_info_fill_app_uri(extension_groups_info,
ten_string_get_raw_str(&self->uri));
}

static bool ten_app_check_start_graph_cmd_from_connection(
ten_app_t *self, ten_connection_t *connection, ten_shared_ptr_t *cmd,
ten_error_t *err) {
Expand Down Expand Up @@ -98,6 +117,9 @@ bool ten_app_handle_start_graph_cmd(ten_app_t *self,
return false;
}

// Fill the app uri of the nodes in the start_graph cmd.
ten_app_fill_start_graph_cmd_node_app_uri(self, cmd);

ten_engine_t *engine =
ten_app_get_engine_based_on_dest_graph_id_from_msg(self, cmd);
if (engine == NULL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ ten_extension_info_t *ten_extension_info_from_smart_ptr(

static void ten_extension_info_fill_app_uri(ten_extension_info_t *self,
const char *app_uri) {
TEN_ASSERT(self && ten_extension_info_check_integrity(self, true),
TEN_ASSERT(self && ten_extension_info_check_integrity(self, false),
"Invalid argument.");
TEN_ASSERT(app_uri, "Should not happen.");
TEN_ASSERT(!ten_loc_is_empty(&self->loc), "Should not happen.");
Expand Down Expand Up @@ -456,7 +456,7 @@ void ten_extensions_info_fill_app_uri(ten_list_t *extensions_info,
ten_extension_info_t *extension_info =
ten_shared_ptr_get_data(ten_smart_ptr_listnode_get(iter.node));
TEN_ASSERT(extension_info &&
ten_extension_info_check_integrity(extension_info, true),
ten_extension_info_check_integrity(extension_info, false),
"Invalid argument.");

ten_extension_info_fill_app_uri(extension_info, app_uri);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
//
// Copyright © 2025 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#include "gtest/gtest.h"
#include "include_internal/ten_runtime/binding/cpp/ten.h"
#include "tests/common/client/cpp/msgpack_tcp.h"
#include "tests/ten_runtime/smoke/util/binding/cpp/check.h"

namespace {

class test_normal_extension_1 : public ten::extension_t {
public:
explicit test_normal_extension_1(const char *name) : ten::extension_t(name) {}

void on_start(ten::ten_env_t &ten_env) override {
ten_env.send_cmd(ten::cmd_t::create("A"),
[this](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd_result,
ten::error_t *err) { ten_env.on_start_done(); });
}
};

class test_normal_extension_2 : public ten::extension_t {
public:
explicit test_normal_extension_2(const char *name) : ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
if (cmd->get_name() == "B") {
ten_env.return_result(ten::cmd_result_t::create(TEN_STATUS_CODE_OK),
std::move(cmd));
} else {
auto cmd_name = cmd->get_name();
auto error_msg =
"test_normal_extension_2 received unexpected cmd: " + cmd_name;

TEN_ENV_LOG_ERROR(ten_env, error_msg.c_str());
TEN_ASSERT(0, "Should not happen.");
}
}
};

class test_normal_extension_3 : public ten::extension_t {
public:
explicit test_normal_extension_3(const char *name) : ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
if (cmd->get_name() == "B") {
ten_env.return_result(ten::cmd_result_t::create(TEN_STATUS_CODE_OK),
std::move(cmd));
} else {
auto cmd_name = cmd->get_name();
auto error_msg =
"test_normal_extension_3 received unexpected cmd: " + cmd_name;

TEN_ENV_LOG_ERROR(ten_env, error_msg.c_str());
TEN_ASSERT(0, "Should not happen.");
}
}
};

class test_predefined_graph : public ten::extension_t {
public:
explicit test_predefined_graph(const char *name) : ten::extension_t(name) {}

void on_start(ten::ten_env_t &ten_env) override {
auto start_graph_cmd = ten::cmd_start_graph_t::create();
start_graph_cmd->set_dest("localhost", nullptr, nullptr, nullptr);
start_graph_cmd->set_graph_from_json(
R"({
"nodes": [
{
"addon": "test_normal_extension_1",
"extension_group": "default",
"name": "test_normal_extension_1",
"type": "extension"
},
{
"addon": "test_normal_extension_2",
"extension_group": "test_normal_extension_2",
"name": "test_normal_extension_2",
"type": "extension"
}
],
"connections": [
{
"extension": "test_normal_extension_1",
"cmd": [
{
"name": "A",
"dest": [
{
"extension": "test_normal_extension_2",
"msg_conversion": {
"keep_original": true,
"rules": [
{
"conversion_mode": "fixed_value",
"path": "_ten.name",
"value": "B"
}
],
"type": "per_property"
}
}
]
}
]
}
]
})"_json.dump()
.c_str());

ten_env.send_cmd(
std::move(start_graph_cmd),
[this](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd_result,
ten::error_t *err) {
// result for the 'start_graph' command
auto graph_id = cmd_result->get_property_string("detail");

// Shut down the graph; otherwise, the app won't be able to close
// because there is still a running engine/graph.
auto stop_graph_cmd = ten::cmd_stop_graph_t::create();
stop_graph_cmd->set_dest("localhost", nullptr, nullptr, nullptr);
stop_graph_cmd->set_graph_id(graph_id.c_str());

ten_env.send_cmd(
std::move(stop_graph_cmd),
[this](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd_result,
ten::error_t *err) {
start_graph_cmd_is_done = true;

if (test_cmd != nullptr) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result_for_test =
ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result_for_test->set_property_from_json(
"detail", detail.dump().c_str());
ten_env.return_result(std::move(cmd_result_for_test),
std::move(test_cmd));
}
});
});

ten_env.on_start_done();
}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
if (cmd->get_name() == "test") {
if (start_graph_cmd_is_done) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property_from_json("detail", detail.dump().c_str());
ten_env.return_result(std::move(cmd_result), std::move(cmd));
} else {
test_cmd = std::move(cmd);
return;
}
} else {
TEN_ASSERT(0, "Should not happen.");
}
}

private:
bool start_graph_cmd_is_done{};
std::unique_ptr<ten::cmd_t> test_cmd;
};

class test_app : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
bool rc = ten::ten_env_internal_accessor_t::init_manifest_from_json(
ten_env,
// clang-format off
R"({
"type": "app",
"name": "test_app",
"version": "0.1.0"
})"
// clang-format on
);
ASSERT_EQ(rc, true);

rc = ten_env.init_property_from_json(
// clang-format off
R"({
"_ten": {
"addon": {
"preload_all": true
},
"uri": "msgpack://127.0.0.1:8001/",
"log_level": 2,
"predefined_graphs": [{
"name": "default",
"auto_start": false,
"singleton": true,
"nodes": [{
"type": "extension",
"name": "predefined_graph",
"addon": "start_graph_with_msg_conversion__predefined_graph_extension",
"extension_group": "start_graph_with_msg_conversion__predefined_graph_group"
}]
}]
}
})"
// clang-format on
);
ASSERT_EQ(rc, true);

ten_env.on_configure_done();
}
};

void *app_thread_main(TEN_UNUSED void *args) {
auto *app = new test_app();
app->run();
delete app;

return nullptr;
}

TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_graph_with_msg_conversion__predefined_graph_extension,
test_predefined_graph);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(test_normal_extension_1,
test_normal_extension_1);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(test_normal_extension_2,
test_normal_extension_3);

} // namespace

TEST(StartGraphTest, StartGraphWithMsgConversion) { // NOLINT
auto *app_thread = ten_thread_create("app thread", app_thread_main, nullptr);

// Create a client and connect to the app.
auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/");

// Do not need to send 'start_graph' command first.
// The 'graph_id' MUST be "default" (a special string) if we want to send the
// request to predefined graph.
auto test_cmd = ten::cmd_t::create("test");
test_cmd->set_dest("msgpack://127.0.0.1:8001/", "default",
"start_graph_with_msg_conversion__predefined_graph_group",
"predefined_graph");
auto cmd_result = client->send_cmd_and_recv_result(std::move(test_cmd));
ten_test::check_status_code(cmd_result, TEN_STATUS_CODE_OK);
ten_test::check_detail_with_json(cmd_result, R"({"id": 1, "name": "a"})");

delete client;

ten_thread_join(app_thread, -1);
}