diff --git a/.docker/poc/Dockerfile b/.docker/poc/Dockerfile index 6129f539c..587ec785e 100644 --- a/.docker/poc/Dockerfile +++ b/.docker/poc/Dockerfile @@ -36,16 +36,49 @@ RUN if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ]; then \ else \ LIB_SUBDIR="x86_64-linux-gnu"; \ fi \ - && mkdir -p ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR} ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR} ${DAS_DIR}/syslibs/etc/ssl/certs \ - && cp -f /usr/lib/${LIB_SUBDIR}/libssl.so.3 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /usr/lib/${LIB_SUBDIR}/libcrypto.so.3 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /lib/${LIB_SUBDIR}/libz.so.1 ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /usr/lib/${LIB_SUBDIR}/libzstd.so.1 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /usr/lib/${LIB_SUBDIR}/libsnappy.so.1 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /lib/${LIB_SUBDIR}/libbz2.so.1.0 ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /lib/${LIB_SUBDIR}/liblzma.so.5 ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /usr/lib/${LIB_SUBDIR}/libcurl.so.4 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ - && cp -f /usr/lib/${LIB_SUBDIR}/libevent-2.1.so.7 ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true \ + && mkdir -p ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR} \ + ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR} \ + ${DAS_DIR}/syslibs/etc/ssl/certs \ + # --- usr/lib/ libraries --- + && USR_LIBS=" \ + libssl.so.3 \ + libcrypto.so.3 \ + libzstd.so.1 \ + libsnappy.so.1 \ + libcurl.so.4 \ + libevent-2.1.so.7 \ + # Libs from here and on are related to the libpqxx lib, used by the database-adapter. + libgssapi_krb5.so.2 \ + libkrb5.so.3 \ + libk5crypto.so.3 \ + libcom_err.so.2 \ + libkrb5support.so.0 \ + liblber-2.5.so.0 \ + libkeyutils.so.1 \ + libp11-kit.so.0 \ + libidn2.so.0 \ + libunistring.so.2 \ + libtasn1.so.6 \ + libnettle.so.8 \ + libhogweed.so.6 \ + libgmp.so.10 \ + libffi.so.8 \ + libldap-2.5.so.0 \ + libsasl2.so.2 \ + libgnutls.so.30 \ + " \ + # lib/ folder libraries --- + && LIB_LIBS=" \ + libz.so.1 \ + libbz2.so.1.0 \ + liblzma.so.5 \ + " \ + && for lib in $USR_LIBS; do \ + cp -f /usr/lib/${LIB_SUBDIR}/$lib ${DAS_DIR}/syslibs/usr/lib/${LIB_SUBDIR}/ 2>/dev/null || true; \ + done \ + && for lib in $LIB_LIBS; do \ + cp -f /lib/${LIB_SUBDIR}/$lib ${DAS_DIR}/syslibs/lib/${LIB_SUBDIR}/ 2>/dev/null || true; \ + done \ && cp -f /etc/ssl/certs/ca-certificates.crt ${DAS_DIR}/syslibs/etc/ssl/certs/ 2>/dev/null || true FROM gcr.io/distroless/cc-debian12:nonroot AS runner @@ -61,8 +94,8 @@ COPY --from=builder /opt/das/syslibs/usr/lib/ /usr/lib/ COPY --from=builder /opt/das/syslibs/lib/ /lib/ COPY --from=builder /opt/das/syslibs/etc/ssl/certs/ /etc/ssl/certs/ -ENV LD_LIBRARY_PATH=/usr/local/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/aarch64-linux-gnu:/lib/x86_64-linux-gnu:/lib/aarch64-linux-gnu +ENV LD_LIBRARY_PATH=/opt/das/lib:/usr/local/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/aarch64-linux-gnu:/lib/x86_64-linux-gnu:/lib/aarch64-linux-gnu USER nonroot:nonroot -CMD [ "query_broker" ] +CMD [ "busnode" ] diff --git a/.github/scripts/bazel_build.sh b/.github/scripts/bazel_build.sh index 698bba6ae..311557792 100755 --- a/.github/scripts/bazel_build.sh +++ b/.github/scripts/bazel_build.sh @@ -67,6 +67,7 @@ BAZEL_BINARY_TARGETS=( "//:tests_db_loader" "//:busnode" "//:busclient" + "//:database_adapter" ) BAZEL_BINARY_OUTPUTS=( @@ -78,6 +79,7 @@ BAZEL_BINARY_OUTPUTS=( "bazel-bin/tests_db_loader" "bazel-bin/busnode" "bazel-bin/busclient" + "bazel-bin/database_adapter" ) BAZEL_LIB_OUTPUTS=( @@ -181,6 +183,21 @@ if [ -d "$LIB_DIR" ]; then done fi +echo "[INFO] Copying database-adapter shared library dependencies into $LIB_DIR..." + +find "$BIN_DIR" -type f -executable -name "database_adapter" | while IFS= read -r binfile; do + ldd "$binfile" | awk '/=> \// { print $3 }' | while IFS= read -r dep; do + dep_base=$(basename "$dep") + case "$dep_base" in + "libpq.so.5") + if [ -f "$dep" ]; then + cp -f "$dep" "$LIB_DIR" + fi + ;; + esac + done +done + echo "[INFO] Build finished successfully." echo "[INFO] Binaries in: $BIN_DIR" echo "[INFO] Libraries in: $LIB_DIR" diff --git a/.github/scripts/setup_bazel.sh b/.github/scripts/setup_bazel.sh index 1b6de899d..8057e1cff 100755 --- a/.github/scripts/setup_bazel.sh +++ b/.github/scripts/setup_bazel.sh @@ -29,12 +29,14 @@ sudo apt update -y sudo apt install -y \ git build-essential curl protobuf-compiler python3 python3-pip \ cmake unzip uuid-runtime lcov bc \ - libevent-dev libssl-dev pkg-config libncurses5 + libevent-dev libssl-dev pkg-config libncurses5 libpq-dev echo "[INFO] Cleaning apt cache..." sudo apt clean -y || true sudo rm -rf /var/lib/apt/lists/* || true +# Installing bazelisk and copying other assets. + echo "[INFO] Installing 3rd-party tools (bazelisk, buildifier)..." if [[ ! -f "${ASSETS_DIR}/3rd-party.tgz" ]]; then @@ -119,6 +121,28 @@ if ! id "builder" &>/dev/null; then sudo useradd -ms /bin/bash builder fi +echo "[INFO] Installing libpqxx (PostgreSQL C++ client)..." + +if [[ ! -f "${ASSETS_DIR}/libpqxx-7.10.5.tar.gz" ]]; then + echo "[ERROR] ${ASSETS_DIR}/libpqxx-7.10.5.tar.gz not found." + exit 1 +fi + +cp "${ASSETS_DIR}/libpqxx-7.10.5.tar.gz" "${TMP_DIR}/" +cd "${TMP_DIR}" +tar xzvf libpqxx-7.10.5.tar.gz +cd libpqxx-7.10.5 + +cmake -S . -B build \ + -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + -DBUILD_TESTING=OFF \ + -DCMAKE_INSTALL_PREFIX=/usr/local + +cmake --build build -j"$(nproc)" + +sudo cmake --install build +sudo ldconfig + echo "[INFO] Configuring git safe.directory for ${DAS_DIR}..." sudo -u builder git config --global --add safe.directory "${DAS_DIR}" diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 422f3f6b3..f92c57535 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -72,5 +72,20 @@ jobs: sleep 1 done + - name: Starting Postgres Server and load database + run: |- + docker run -d --name pg_test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=postgres_wrapper_test -p 5433:5432 postgres:15 + + for i in {1..30}; do + if docker exec pg_test pg_isready -U postgres -d postgres_wrapper_test; then + echo "Postgres is ready" + break + fi + echo "Waiting for Postgres..." + sleep 1 + done + + docker exec -i pg_test psql -U postgres -d postgres_wrapper_test < src/scripts/postgres_setup.sql + - name: Execute Unit Test Suite run: make test-all diff --git a/Makefile b/Makefile index c377cc9a1..06ff62199 100644 --- a/Makefile +++ b/Makefile @@ -61,6 +61,9 @@ setup-inference-toy-problem: run-tests-db-loader: @bash -x src/scripts/run.sh tests_db_loader $(OPTIONS) +run-adapter: + @bash -x src/scripts/run.sh database_adapter $(OPTIONS) + setup-nunet-dms: @bash -x src/scripts/setup-nunet-dms.sh diff --git a/src/BUILD b/src/BUILD index f42bc37e2..7c65ee484 100644 --- a/src/BUILD +++ b/src/BUILD @@ -44,6 +44,7 @@ cc_shared_library( "//atomdb:atomdb_lib", "//attention_broker:attention_broker_lib", "//commons:commons_lib", + "//db_adapter:db_adapter_lib", "//distributed_algorithm_node:distributed_algorithm_node_lib", "//hasher:hasher_lib", "//metta:metta_lib", @@ -188,6 +189,27 @@ cc_binary( ], ) +cc_binary( + name = "database_adapter", + srcs = [], + defines = ["BAZEL_BUILD"], + linkopts = [ + "-L/usr/local/lib", + "-lpqxx", + "-lpq", + "-lhiredis_cluster", + "-lhiredis", + "-lmongocxx", + "-lbsoncxx", + ], + linkstatic = 1, + deps = [ + "//commons:commons_lib", + "//tests/main:database_adapter_main_lib", + "@mbedtls", + ], +) + cc_binary( name = "busnode", srcs = [], diff --git a/src/assets/libpqxx-7.10.5.tar.gz b/src/assets/libpqxx-7.10.5.tar.gz new file mode 100644 index 000000000..8fc0c0a71 Binary files /dev/null and b/src/assets/libpqxx-7.10.5.tar.gz differ diff --git a/src/commons/HandleTrie.cc b/src/commons/HandleTrie.cc index 44229969f..99066e3ae 100644 --- a/src/commons/HandleTrie.cc +++ b/src/commons/HandleTrie.cc @@ -26,7 +26,11 @@ HandleTrie::TrieNode::~TrieNode() { delete children[i]; } delete[] children; - delete value; + // TODO: Remove this check once improve insert() + if (value != NULL) { + delete value; + value = NULL; + } } string HandleTrie::TrieValue::to_string() { return ""; } @@ -269,3 +273,8 @@ void HandleTrie::traverse(bool keep_root_locked, root->trie_node_mutex.unlock(); } } + +bool HandleTrie::exists(const string& key) { + TrieNode* node = lookup_node(key); + return node != NULL ? true : false; +} \ No newline at end of file diff --git a/src/commons/HandleTrie.h b/src/commons/HandleTrie.h index 5d352ecce..297cbaa8d 100644 --- a/src/commons/HandleTrie.h +++ b/src/commons/HandleTrie.h @@ -74,6 +74,8 @@ class HandleTrie { */ TrieValue* lookup(const string& key); + bool exists(const string& key); + /** * Remove a key from this HandleTrie and its associated value. * diff --git a/src/db_adapter/BUILD b/src/db_adapter/BUILD new file mode 100644 index 000000000..88c396bf9 --- /dev/null +++ b/src/db_adapter/BUILD @@ -0,0 +1,96 @@ +load("@rules_cc//cc:cc_library.bzl", "cc_library") + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "db_adapter_lib", + includes = ["."], + deps = [ + ":context_loader", + ":data_mapper", + ":data_types", + ":database_connection", + ":database_wrapper", + ":pipeline", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + "//db_adapter/postgres:postgres_lib", + ], +) + +cc_library( + name = "data_types", + hdrs = ["DataTypes.h"], + includes = ["."], + deps = [ + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + ], +) + +cc_library( + name = "data_mapper", + srcs = ["DataMapper.cc"], + hdrs = ["DataMapper.h"], + includes = ["."], + deps = [ + ":data_types", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + ], +) + +cc_library( + name = "database_wrapper", + srcs = ["DatabaseWrapper.cc"], + hdrs = ["DatabaseWrapper.h"], + includes = ["."], + deps = [ + ":data_mapper", + ":data_types", + ":database_connection", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + ], +) + +cc_library( + name = "context_loader", + srcs = ["ContextLoader.cc"], + hdrs = ["ContextLoader.h"], + includes = ["."], + deps = [ + ":data_types", + "//commons:commons_lib", + "@nlohmann_json//:json", + ], +) + +cc_library( + name = "database_connection", + srcs = ["DatabaseConnection.cc"], + hdrs = ["DatabaseConnection.h"], + includes = ["."], + deps = [ + ":data_types", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + "//commons/processor:processor_lib", + ], +) + +cc_library( + name = "pipeline", + srcs = ["Pipeline.cc"], + hdrs = ["Pipeline.h"], + includes = ["."], + deps = [ + ":data_types", + ":database_wrapper", + "//atomdb", + "//atomdb:atomdb_singleton", + "//commons:commons_lib", + "//commons/atoms:atoms_lib", + "//db_adapter/postgres:postgres_lib", + ], +) diff --git a/src/db_adapter/ContextLoader.cc b/src/db_adapter/ContextLoader.cc new file mode 100644 index 000000000..e6b032dad --- /dev/null +++ b/src/db_adapter/ContextLoader.cc @@ -0,0 +1,151 @@ +#include "ContextLoader.h" + +#include +#include +#include + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" +#include "Utils.h" + +using namespace std; +using namespace commons; + +using json = nlohmann::json; + +namespace fs = std::filesystem; + +vector ContextLoader::load_context_file(const string& file_path) { + if (!fs::exists(file_path)) { + Utils::error("Context file " + file_path + " does not exist"); + } + + ifstream f(file_path); + + json tables = json::parse(f); + + vector out; + + bool has_error = false; + + for (size_t i = 0; i < tables.size(); ++i) { + string msg_base = "table[" + to_string(i) + "]"; + + const json& table = tables[i]; + + TableMapping tm; + + if (!table.contains("table_name")) { + LOG_ERROR(msg_base + " missing required key: 'table_name'"); + has_error = true; + } else if (!table["table_name"].is_string()) { + LOG_ERROR(msg_base + ".table_name must be a string in a table entry"); + has_error = true; + } else { + string tn = table["table_name"].get(); + size_t count_dot = 0; + for (char c : tn) { + if (c == '.') ++count_dot; + } + if (count_dot != 1) { + LOG_ERROR(msg_base + ".table_name must be in format 'schema.table'"); + has_error = true; + } else { + size_t pos = tn.find('.'); + if (pos == 0 || pos + 1 >= tn.size()) { + LOG_ERROR(msg_base + "table_name must be in format 'schema.table'"); + has_error = true; + } + } + } + + if (!table.contains("skip_columns")) { + LOG_ERROR(msg_base + " missing required key: 'skip_columns'"); + has_error = true; + } else { + const json& sc = table["skip_columns"]; + if (!sc.is_null()) { + if (!sc.is_array()) { + LOG_ERROR(msg_base + + ".skip_columns must be an array of strings or null in a table entry"); + has_error = true; + } else { + tm.skip_columns.emplace(); + for (size_t k = 0; k < sc.size(); ++k) { + if (!sc[k].is_string()) { + LOG_ERROR(msg_base + ".skip_columns[" + to_string(k) + + "] must be a string in a table entry"); + has_error = true; + } else { + tm.skip_columns->push_back(sc[k]); + } + } + } + } + } + + if (!table.contains("where_clauses")) { + LOG_ERROR(msg_base + " missing required key: 'where_clauses'"); + has_error = true; + } else { + const json& wc = table["where_clauses"]; + if (!wc.is_null()) { + if (!wc.is_array()) { + LOG_ERROR(msg_base + + ".where_clauses must be an array of strings or null in a table entry"); + has_error = true; + } else { + tm.where_clauses.emplace(); + for (size_t k = 0; k < wc.size(); ++k) { + if (!wc[k].is_string()) { + LOG_ERROR(msg_base + ".where_clauses[" + to_string(k) + + "] must be a string in a table entry"); + has_error = true; + } else { + tm.where_clauses->push_back(wc[k]); + } + } + } + } + } + + if (!has_error) { + tm.table_name = table["table_name"]; + out.push_back(tm); + } + } + + if (has_error) { + LOG_ERROR("Context file validation failed with errors. Please fix the issues and try again."); + return vector{}; + } + return out; +} + +vector ContextLoader::load_query_file(const string& file_path) { + if (!fs::exists(file_path)) { + Utils::error("Query file " + file_path + " does not exist"); + } + + ifstream f(file_path); + + vector out; + string query; + string line; + + while (getline(f, line)) { + line = Utils::trim(line); + if (!line.empty()) { + query += line + " "; + } else { + out.push_back(query); + query.clear(); + } + } + + if (!query.empty()) { + out.push_back(query); + } + + return out; +} \ No newline at end of file diff --git a/src/db_adapter/ContextLoader.h b/src/db_adapter/ContextLoader.h new file mode 100644 index 000000000..bc3b1585d --- /dev/null +++ b/src/db_adapter/ContextLoader.h @@ -0,0 +1,13 @@ +#include +#include + +#include "DataTypes.h" + +using namespace std; +using namespace db_adapter; + +class ContextLoader { + public: + static vector load_context_file(const string& file_path); + static vector load_query_file(const string& file_path); +}; \ No newline at end of file diff --git a/src/db_adapter/DataMapper.cc b/src/db_adapter/DataMapper.cc new file mode 100644 index 000000000..0804f7c64 --- /dev/null +++ b/src/db_adapter/DataMapper.cc @@ -0,0 +1,414 @@ +#include "DataMapper.h" + +#include +#include + +#include "Atom.h" +#include "DataTypes.h" +#include "Hasher.h" +#include "Link.h" +#include "MettaMapping.h" +#include "Node.h" + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace db_adapter; +using namespace commons; +using namespace atoms; + +string BaseSQL2Mapper::SYMBOL; +string BaseSQL2Mapper::EXPRESSION; + +// -- BaseSQL2Mapper + +const OutputList BaseSQL2Mapper::map(const DbInput& data) { + vector> all_foreign_keys; + SqlRow sql_row = get(data); + string table_name = sql_row.table_name; + + string primary_key_value = sql_row.primary_key ? sql_row.primary_key->value : ""; + + this->map_primary_key(table_name, primary_key_value); + + for (auto& field : sql_row.fields) { + string column_name = table_name + "." + field.name; + + if (this->is_foreign_key(column_name)) { + size_t separator_pos = column_name.find('|'); + + if (separator_pos == string::npos) { + Utils::error("Invalid foreign key format: " + column_name); + } + + string fk_column = column_name.substr(0, separator_pos); + string fk_table = column_name.substr(separator_pos + 1); + all_foreign_keys.push_back({fk_column, fk_table, field.value}); + + this->map_foreign_key_column( + table_name, column_name, field.value, primary_key_value, fk_table, fk_column); + } else { + this->map_regular_column(table_name, column_name, field.value, primary_key_value); + }; + } + this->map_foreign_keys_combinations(all_foreign_keys); + + auto result = move(this->get_output()); + this->clear(); + return result; +} + +bool BaseSQL2Mapper::is_foreign_key(const string& column_name) { + size_t pos = column_name.find('|'); + if (pos == string::npos) return false; + return true; +} + +string BaseSQL2Mapper::escape_inner_quotes(string text) { + const auto count = std::count(text.begin(), text.end(), '"'); + + if (count == 0 || count == 2) return text; + + if (count == 1) { + text.insert(text.begin(), '"'); + text.push_back('"'); + return text; + } + + if (text.size() >= 2 && text.front() == '"' && text.back() == '"') { + string inner = text.substr(1, text.size() - 2); + + string escaped; + escaped.reserve(inner.size()); + for (char c : inner) { + if (c == '"') { + escaped.push_back('\\'); + } + escaped.push_back(c); + } + + return "\"" + escaped + "\""; + } + return ""; +} + +bool BaseSQL2Mapper::insert_handle_if_missing(const string& handle) { + auto exists = this->handle_trie.exists(handle); + if (exists) return false; + this->handle_trie.insert(handle, new EmptyTrieValue()); + return true; +} + +// -- SQL2MettaMapper + +SQL2MettaMapper::SQL2MettaMapper() { this->initialize_statics(); } + +SQL2MettaMapper::~SQL2MettaMapper() {} + +OutputList SQL2MettaMapper::get_output() { return this->metta_expressions; } + +void SQL2MettaMapper::clear() { this->metta_expressions.clear(); } + +void SQL2MettaMapper::add_metta_if_new(const string& s_expression) { + string key = Hasher::context_handle(s_expression); + if (this->insert_handle_if_missing(key)) { + this->metta_expressions.push_back(s_expression); + } +}; + +void SQL2MettaMapper::map_primary_key(const string& table_name, const string& primary_key_value) { + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + + string predicate_link = "(Predicate " + table_name + ")"; + this->add_metta_if_new(predicate_link); + + string concept_inner_link = "(" + table_name + " " + literal_pk + ")"; + this->add_metta_if_new(concept_inner_link); + + string concept_link = "(Concept " + concept_inner_link + ")"; + this->add_metta_if_new(concept_link); + + string evaluation_link = "(Evaluation " + predicate_link + " " + concept_link + ")"; + this->add_metta_if_new(evaluation_link); +} + +void SQL2MettaMapper::map_foreign_key_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value, + const string& fk_table, + const string& fk_column) { + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + + string predicate_inner_1_link = "(" + fk_table + " " + literal_value + ")"; + this->add_metta_if_new(predicate_inner_1_link); + + string predicate_inner_2_link = "(Concept " + predicate_inner_1_link + ")"; + this->add_metta_if_new(predicate_inner_2_link); + + string predicate_inner_3_link = "(" + fk_column + " " + predicate_inner_2_link + ")"; + this->add_metta_if_new(predicate_inner_3_link); + + string predicate_link = "(Predicate " + predicate_inner_3_link + ")"; + this->add_metta_if_new(predicate_link); + + string concept_inner_link = "(" + table_name + " " + literal_pk + ")"; + this->add_metta_if_new(concept_inner_link); + + string concept_link = "(Concept " + concept_inner_link + ")"; + this->add_metta_if_new(concept_link); + + string evaluation_link = "(Evaluation " + predicate_link + " " + concept_link + ")"; + this->add_metta_if_new(evaluation_link); +} + +void SQL2MettaMapper::map_regular_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value) { + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + + string predicate_inner_link = "(" + table_name + " " + column_name + " " + literal_value + ")"; + this->add_metta_if_new(predicate_inner_link); + + string predicate_link = "(Predicate " + predicate_inner_link + ")"; + this->add_metta_if_new(predicate_link); + + string concept_inner_link = "(" + table_name + " " + literal_pk + ")"; + this->add_metta_if_new(concept_inner_link); + + string concept_link = "(Concept " + concept_inner_link + ")"; + this->add_metta_if_new(concept_link); + + string evaluation_link = "(Evaluation " + predicate_link + " " + concept_link + ")"; + this->add_metta_if_new(evaluation_link); +} + +void SQL2MettaMapper::map_foreign_keys_combinations( + const vector>& all_foreign_keys) { + for (const auto& [column_name, foreign_table_name, value] : all_foreign_keys) { + for (const auto& [column_name2, foreign_table_name2, value2] : all_foreign_keys) { + if (make_pair(foreign_table_name, value) != make_pair(foreign_table_name2, value2)) { + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + string literal_value_2 = this->escape_inner_quotes("\"" + value2 + "\""); + + string predicate_inner_1_link = "(" + foreign_table_name + " " + literal_value + ")"; + this->add_metta_if_new(predicate_inner_1_link); + + string predicate_inner_2_link = "(Concept " + predicate_inner_1_link + ")"; + this->add_metta_if_new(predicate_inner_2_link); + + string predicate_inner_3_link = "(" + column_name + " " + predicate_inner_2_link + ")"; + this->add_metta_if_new(predicate_inner_3_link); + + string predicate_link = "(Predicate " + predicate_inner_3_link + ")"; + this->add_metta_if_new(predicate_link); + + string concept_inner_link = "(" + foreign_table_name2 + " " + literal_value_2 + ")"; + this->add_metta_if_new(concept_inner_link); + + string concept_link = "(Concept " + concept_inner_link + ")"; + this->add_metta_if_new(concept_link); + + string evaluation_link = "(Evaluation " + predicate_link + " " + concept_link + ")"; + this->add_metta_if_new(evaluation_link); + } + } + } +} + +// -- SQL2AtomsMapper + +SQL2AtomsMapper::SQL2AtomsMapper() { this->initialize_statics(); } + +SQL2AtomsMapper::~SQL2AtomsMapper() {} + +OutputList SQL2AtomsMapper::get_output() { return this->atoms; } + +void SQL2AtomsMapper::clear() { this->atoms.clear(); } + +string SQL2AtomsMapper::add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE atom_type, + variant> value, + bool is_toplevel) { + Atom* atom; + + if (atom_type == SQL2AtomsMapper::ATOM_TYPE::NODE) { + string name = get(value); + atom = new Node(BaseSQL2Mapper::SYMBOL, name); + } else if (atom_type == SQL2AtomsMapper::ATOM_TYPE::LINK) { + vector targets = get>(value); + atom = new Link(BaseSQL2Mapper::EXPRESSION, targets, is_toplevel); + } else { + Utils::error("Either name or targets must be provided to create an Atom."); + } + + string handle = atom->handle(); + if (this->insert_handle_if_missing(handle)) { + this->atoms.push_back(atom); + } else { + delete atom; + } + return handle; +}; + +void SQL2AtomsMapper::map_primary_key(const string& table_name, const string& primary_key_value) { + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + + // Nodes + string predicate_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Predicate")); + string concept_node_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Concept")); + string evaluation_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Evaluation")); + string literal_pk_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_pk)); + string table_name_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(table_name)); + + // Links + string predicate_link_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{predicate_node_h, table_name_node_h}); + string concept_inner_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{table_name_node_h, literal_pk_node_h}); + string concept_link_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{concept_node_h, concept_inner_link_h}); + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{evaluation_node_h, predicate_link_h, concept_link_h}, + true); +} + +void SQL2AtomsMapper::map_foreign_key_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value, + const string& fk_table, + const string& fk_column) { + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + + // Nodes + string predicate_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Predicate")); + string concept_node_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Concept")); + string evaluation_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Evaluation")); + string literal_pk_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_pk)); + string table_name_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(table_name)); + string fk_table_node_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(fk_table)); + string literal_value_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_value)); + string fk_column_node_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(fk_column)); + + // Links + string predicate_inner_1_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{fk_table_node_h, literal_value_node_h}); + string predicate_inner_2_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{concept_node_h, predicate_inner_1_link_h}); + string predicate_inner_3_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{fk_column_node_h, predicate_inner_2_link_h}); + string predicate_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{predicate_node_h, predicate_inner_3_link_h}); + string concept_inner_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{table_name_node_h, literal_pk_node_h}); + string concept_link_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{concept_node_h, concept_inner_link_h}); + + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{evaluation_node_h, predicate_link_h, concept_link_h}, + true); +} + +void SQL2AtomsMapper::map_regular_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value) { + string literal_pk = this->escape_inner_quotes("\"" + primary_key_value + "\""); + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + + // Nodes + string predicate_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Predicate")); + string concept_node_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Concept")); + string evaluation_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Evaluation")); + string table_name_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(table_name)); + string column_name_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(column_name)); + string literal_value_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_value)); + string literal_pk_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_pk)); + + // Links + string predicate_inner_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{table_name_node_h, column_name_node_h, literal_value_node_h}); + string predicate_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{predicate_node_h, predicate_inner_link_h}); + string concept_inner_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, vector{table_name_node_h, literal_pk_node_h}); + string concept_link_h = this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{concept_node_h, concept_inner_link_h}); + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{evaluation_node_h, predicate_link_h, concept_link_h}, + true); +} + +void SQL2AtomsMapper::map_foreign_keys_combinations( + const vector>& all_foreign_keys) { + for (const auto& [column_name, foreign_table_name, value] : all_foreign_keys) { + for (const auto& [column_name2, foreign_table_name2, value2] : all_foreign_keys) { + if (make_pair(foreign_table_name, value) != make_pair(foreign_table_name2, value2)) { + string literal_value = this->escape_inner_quotes("\"" + value + "\""); + string literal_value_2 = this->escape_inner_quotes("\"" + value2 + "\""); + + // Nodes + string predicate_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Predicate")); + string concept_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Concept")); + string evaluation_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string("Evaluation")); + string fk_column_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(column_name)); + string foreign_table_name_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(foreign_table_name)); + string literal_value_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_value)); + string foreign_table_name2_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(foreign_table_name2)); + string literal_value2_node_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::NODE, string(literal_value_2)); + // Links + string predicate_inner_1_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{foreign_table_name_node_h, literal_value_node_h}); + string predicate_inner_2_link_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{concept_node_h, predicate_inner_1_link_h}); + string predicate_inner_3_link_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{fk_column_node_h, predicate_inner_2_link_h}); + string predicate_link_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{predicate_node_h, predicate_inner_3_link_h}); + string concept_inner_link_h = this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{foreign_table_name2_node_h, literal_value2_node_h}); + string concept_link_h = + this->add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{concept_node_h, concept_inner_link_h}); + + this->add_atom_if_new( + SQL2AtomsMapper::ATOM_TYPE::LINK, + vector{evaluation_node_h, predicate_link_h, concept_link_h}, + true); + } + } + } +} diff --git a/src/db_adapter/DataMapper.h b/src/db_adapter/DataMapper.h new file mode 100644 index 000000000..a6046b0ce --- /dev/null +++ b/src/db_adapter/DataMapper.h @@ -0,0 +1,154 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "DataTypes.h" +#include "HandleTrie.h" +#include "MettaMapping.h" + +using namespace std; +using namespace atoms; +using namespace commons; + +namespace db_adapter { + +class MapperValue : public HandleTrie::TrieValue { + public: + MapperValue() {} + void merge(TrieValue* other) {} +}; + +/** + * @class Mapper + * @brief Abstract base class for transforming database input into a target representation. + */ +class Mapper { + public: + virtual ~Mapper() = default; + + /** + * @brief Transforms the input data into the output format. + * @param data The database row or document to map. + * @return OutputList A variant containing either strings or Atoms. + */ + virtual const OutputList map(const DbInput& data) = 0; + + unsigned int handle_trie_size() { return handle_trie.size; } + + protected: + Mapper() = default; + HandleTrie handle_trie{32}; +}; + +/** + * @class BaseSQL2Mapper + * @brief Common logic for mapping SQL data, handling Primary and Foreign keys. + */ +class BaseSQL2Mapper : public Mapper { + public: + virtual ~BaseSQL2Mapper() override = default; + + const OutputList map(const DbInput& data) override; + + static string SYMBOL; + static string EXPRESSION; + + static void initialize_statics() { + SYMBOL = MettaMapping::SYMBOL_NODE_TYPE; + EXPRESSION = MettaMapping::EXPRESSION_LINK_TYPE; + } + + protected: + BaseSQL2Mapper() = default; + + bool is_foreign_key(const string& column_name); + string escape_inner_quotes(string text); + virtual OutputList get_output() = 0; + virtual void clear() = 0; + bool insert_handle_if_missing(const string& handle); + + virtual void map_primary_key(const string& table_name, const string& primary_key_value) = 0; + virtual void map_foreign_key_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value, + const string& fk_table, + const string& fk_column) = 0; + virtual void map_regular_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value) = 0; + virtual void map_foreign_keys_combinations( + const vector>& all_foreign_keys) = 0; +}; + +/** + * @class SQL2MettaMapper + * @brief Maps SQL rows to Metta S-Expressions. + */ +class SQL2MettaMapper : public BaseSQL2Mapper { + public: + SQL2MettaMapper(); + ~SQL2MettaMapper() override; + + private: + vector metta_expressions; + OutputList get_output() override; + void clear() override; + void add_metta_if_new(const string& s_expression); + + void map_primary_key(const string& table_name, const string& primary_key_value) override; + void map_foreign_key_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value, + const string& fk_table, + const string& fk_column) override; + void map_regular_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value) override; + void map_foreign_keys_combinations( + const vector>& all_foreign_keys) override; +}; + +/** + * @class SQL2AtomsMapper + * @brief Maps SQL rows to Atom objects. + */ +class SQL2AtomsMapper : public BaseSQL2Mapper { + public: + SQL2AtomsMapper(); + ~SQL2AtomsMapper() override; + + enum ATOM_TYPE { NODE, LINK }; + + private: + vector atoms; + + OutputList get_output() override; + void clear() override; + string add_atom_if_new(SQL2AtomsMapper::ATOM_TYPE atom_type, + variant> value, + bool is_toplevel = false); + + void map_primary_key(const string& table_name, const string& primary_key_value) override; + void map_foreign_key_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value, + const string& fk_table, + const string& fk_column) override; + void map_regular_column(const string& table_name, + const string& column_name, + const string& value, + const string& primary_key_value) override; + void map_foreign_keys_combinations( + const vector>& all_foreign_keys) override; +}; + +} // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/DataTypes.h b/src/db_adapter/DataTypes.h new file mode 100644 index 000000000..230be5afa --- /dev/null +++ b/src/db_adapter/DataTypes.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include +#include + +#include "Atom.h" + +using namespace std; +using namespace atoms; + +namespace db_adapter { + +/** + * @struct ColumnValue + * @brief Represents a single cell in a database row. + */ +struct ColumnValue { + string name; + string value; +}; + +/** + * @struct SqlRow + * @brief Represents a single row from a SQL query result. + */ +struct SqlRow { + string table_name; + optional primary_key; + vector fields; + + /** + * @brief Adds a field to the row. + * @param name Column name. + * @param value Column value. + */ + void add_field(string name, string value) { fields.push_back(ColumnValue{move(name), move(value)}); } + + /** + * @brief Retrieves a value by column name. + * @param name The column name to search for. + * @return optional The value if found, otherwise nullopt. + */ + optional get(const string& name) const { + if (primary_key && primary_key->name == name) { + return primary_key->value; + } + for (const auto& field : fields) { + if (field.name == name) { + return field.value; + } + } + return nullopt; + } + + size_t size() const { return (primary_key ? 1 : 0) + fields.size(); } +}; + +struct NoSqlDocument {}; + +/** + * @typedef DbInput + * @brief A variant representing raw input from either SQL or NoSQL sources. + */ +using DbInput = variant; + +/** + * @typedef OutputList + * @brief The result of the mapping process. + * + * Can be a list of S-Expression strings (SQL2METTA) or a list of Atom pointers (SQL2ATOMS). + */ +using OutputList = variant, vector>; + +/** + * @struct Table + * @brief Metadata describing a database table. + */ +struct Table { + string name; + int row_count = 0; + vector column_names; + string primary_key; + vector foreign_keys; +}; + +/** + * @enum MAPPER_TYPE + * @brief Defines the strategy used to transform database rows. + */ +enum class MAPPER_TYPE { SQL2METTA, SQL2ATOMS }; + +struct TableMapping { + string table_name; + optional> where_clauses = nullopt; + optional> skip_columns = nullopt; +}; + +} // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/DatabaseConnection.cc b/src/db_adapter/DatabaseConnection.cc new file mode 100644 index 000000000..858a5cfa1 --- /dev/null +++ b/src/db_adapter/DatabaseConnection.cc @@ -0,0 +1,41 @@ +#ifndef LOG_LEVEL +#define LOG_LEVEL INFO_LEVEL +#endif +#include "DatabaseConnection.h" + +#include "Logger.h" +using namespace db_adapter; + +DatabaseConnection::DatabaseConnection(const string& id, const string& host, int port) : Processor(id) { + this->host = host; + this->port = port; + this->connected = false; +} + +DatabaseConnection::~DatabaseConnection() {} + +void DatabaseConnection::start() { + if (this->is_running() || this->is_finished()) return; + + { + lock_guard lock(this->connection_mutex); + this->connect(); + this->connected = true; + } + + Processor::start(); +} + +void DatabaseConnection::stop() { + if (!this->is_running()) return; + + { + lock_guard lock(this->connection_mutex); + this->disconnect(); + this->connected = false; + } + + Processor::stop(); +} + +bool DatabaseConnection::is_connected() const { return this->connected; } diff --git a/src/db_adapter/DatabaseConnection.h b/src/db_adapter/DatabaseConnection.h new file mode 100644 index 000000000..31b89930b --- /dev/null +++ b/src/db_adapter/DatabaseConnection.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +#include "Processor.h" + +using namespace std; +using namespace processor; + +namespace db_adapter { + +class DatabaseConnection : public Processor { + public: + DatabaseConnection(const string& id, const string& host, int port); + virtual ~DatabaseConnection(); + + virtual void start() override; + virtual void stop() override; + + virtual void connect() = 0; + virtual void disconnect() = 0; + + bool is_connected() const; + + protected: + string host; + int port; + + private: + bool connected; + mutex connection_mutex; +}; + +} // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/DatabaseWrapper.cc b/src/db_adapter/DatabaseWrapper.cc new file mode 100644 index 000000000..f623d69d3 --- /dev/null +++ b/src/db_adapter/DatabaseWrapper.cc @@ -0,0 +1,22 @@ +#include "DatabaseWrapper.h" + +DatabaseWrapper::DatabaseWrapper(DatabaseConnection& db_client, + shared_ptr mapper, + MAPPER_TYPE mapper_type) + : db_client(db_client), mapper(move(mapper)), mapper_type(mapper_type) {} + +unsigned int DatabaseWrapper::mapper_handle_trie_size() { return this->mapper->handle_trie_size(); } + +SQLWrapper::SQLWrapper(DatabaseConnection& db_client, MAPPER_TYPE mapper_type) + : DatabaseWrapper(db_client, create_mapper(mapper_type), mapper_type) {} + +shared_ptr SQLWrapper::create_mapper(MAPPER_TYPE mapper_type) { + switch (mapper_type) { + case MAPPER_TYPE::SQL2METTA: + return make_shared(); + case MAPPER_TYPE::SQL2ATOMS: + return make_shared(); + default: + throw invalid_argument("Unknown mapper type"); + } +} diff --git a/src/db_adapter/DatabaseWrapper.h b/src/db_adapter/DatabaseWrapper.h new file mode 100644 index 000000000..563ea914f --- /dev/null +++ b/src/db_adapter/DatabaseWrapper.h @@ -0,0 +1,75 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "DataMapper.h" +#include "DataTypes.h" +#include "DatabaseConnection.h" + +using namespace std; +using namespace db_adapter; +using namespace commons; + +namespace db_adapter { + +/** + * @class DatabaseWrapper + * @brief Generic interface for a database connection wrapper. + */ +class DatabaseWrapper { + public: + DatabaseWrapper(DatabaseConnection& db_client, shared_ptr mapper, MAPPER_TYPE mapper_type); + virtual ~DatabaseWrapper() = default; + + unsigned int mapper_handle_trie_size(); + + protected: + DatabaseConnection& db_client; + shared_ptr mapper; + MAPPER_TYPE mapper_type; +}; + +/** + * @class SQLWrapper + * @brief Specialization of DatabaseWrapper for SQL-based databases. + */ +class SQLWrapper : public DatabaseWrapper { + public: + SQLWrapper(DatabaseConnection& db_client, MAPPER_TYPE mapper_type); + virtual ~SQLWrapper() = default; + + /** + * @brief Retrieves schema information for a specific table. + */ + virtual Table get_table(const string& name) = 0; + + /** + * @brief Lists all tables in the database. + */ + virtual vector list_tables() = 0; + + /** + * @brief Maps a table's data to the target format. + * + * @param table The table metadata. + * @param clauses Additional SQL WHERE clauses. + * @param skip_columns Columns to exclude from mapping. + * @param second_level Boolean flag for recursive/depth mapping logic. + */ + virtual void map_table(const Table& table, + const vector& clauses, + const vector& skip_columns, + bool second_level) = 0; + + /** + * @brief Executes a raw SQL query and maps the result. + */ + virtual void map_sql_query(const string& virtual_name, const string& raw_query) = 0; + + static shared_ptr create_mapper(MAPPER_TYPE mapper_type); +}; +} // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/Pipeline.cc b/src/db_adapter/Pipeline.cc new file mode 100644 index 000000000..089e06afb --- /dev/null +++ b/src/db_adapter/Pipeline.cc @@ -0,0 +1,147 @@ +#include "Pipeline.h" + +#include "AtomDBSingleton.h" +#include "DedicatedThread.h" +#include "Logger.h" +#include "PostgresWrapper.h" +#include "Processor.h" +#include "SharedQueue.h" + +#define LOG_LEVEL DEBUG_LEVEL +#include "Logger.h" + +using namespace atomdb; +using namespace std; +using namespace commons; +using namespace atoms; +using namespace db_adapter; + +DatabaseMappingJob::DatabaseMappingJob(const string& host, + int port, + const string& database, + const string& user, + const string& password, + MAPPER_TYPE mapper_type, + shared_ptr output_queue) { + this->db_conn = + make_unique("psql-conn", host, port, database, user, password); + this->wrapper = make_unique(*db_conn, mapper_type, output_queue); +} + +DatabaseMappingJob::~DatabaseMappingJob() { this->db_conn->stop(); } + +void DatabaseMappingJob::add_task_query(const string& virtual_name, const string& query) { + this->tasks.push_back(MappingTask{MappingTask::QUERY, TableMapping{}, virtual_name, query}); +} + +void DatabaseMappingJob::add_task_table(TableMapping table_mapping) { + this->tasks.push_back(MappingTask{MappingTask::TABLE, move(table_mapping), "", ""}); +} + +bool DatabaseMappingJob::thread_one_step() { + LOG_DEBUG("DatabaseMappingJob thread_one_step called. Current task index: " << this->current_task); + if (this->current_task >= this->tasks.size()) { + this->db_conn->stop(); + return false; + } + + if (!this->initialized) { + this->db_conn->setup(); + this->db_conn->start(); + this->initialized = true; + } + + auto& task = this->tasks[this->current_task]; + + LOG_DEBUG("Processing task " << this->current_task << " of type " + << (task.type == MappingTask::TABLE ? "TABLE" : "QUERY")); + + if (task.type == MappingTask::TABLE) { + auto table = this->wrapper->get_table(task.table_mapping.table_name); + this->wrapper->map_table(table, + task.table_mapping.where_clauses.value_or(vector{}), + task.table_mapping.skip_columns.value_or(vector{}), + false); + } else if (task.type == MappingTask::QUERY) { + this->wrapper->map_sql_query(task.virtual_name, task.query); + } + + this->current_task++; + this->finished = (this->current_task >= this->tasks.size()); + return !this->finished; +} + +bool DatabaseMappingJob::is_finished() const { return this->finished; } + +AtomPersistenceJob::AtomPersistenceJob(shared_ptr input_queue) : input_queue(input_queue) { + this->atomdb = AtomDBSingleton::get_instance(); +} + +AtomPersistenceJob::~AtomPersistenceJob() { + for (auto& atom : this->atoms) { + delete atom; + } + this->atoms.clear(); +} + +bool AtomPersistenceJob::thread_one_step() { + LOG_DEBUG("== START =="); + LOG_DEBUG("Current input queue size 1: " << this->input_queue->size()); + LOG_DEBUG("Current finished status: " << (this->finished ? "true" : "false")); + + if (this->input_queue->empty()) { + if (this->producer_finished) { + LOG_INFO( + "Producer has finished and input queue is empty. Marking AtomPersistenceJob as " + "finished."); + this->finished = true; + if (!this->atoms.empty()) { + LOG_DEBUG("Processing remaining " << this->atoms.size() << " atoms."); + this->atomdb->add_atoms(this->atoms, false, true); + LOG_DEBUG("Batch processed and added to AtomDB. Clearing batch from memory."); + for (auto& atom : this->atoms) { + delete atom; + } + this->atoms.clear(); + } + } + Utils::sleep(); + return false; + } + + try { + for (size_t i = 0; i < BATCH_SIZE; ++i) { + if (this->input_queue->empty()) break; + auto atom = (Atom*) this->input_queue->dequeue(); + if (atom == nullptr) { + Utils::error("Dequeued atom is nullptr"); + } + this->atoms.push_back(atom); + } + if (!this->atoms.empty() && this->atoms.size() >= BATCH_SIZE) { + LOG_DEBUG("Processing batch of " << this->atoms.size() << " atoms."); + this->atomdb->add_atoms(this->atoms, false, true); + LOG_DEBUG("Batch processed and added to AtomDB. Clearing batch from memory."); + for (auto& atom : this->atoms) { + delete atom; + } + this->atoms.clear(); + } + LOG_DEBUG("Current Atom size: " << this->atoms.size()); + LOG_DEBUG("Current input queue size 2: " << this->input_queue->size()); + LOG_DEBUG("== END =="); + return true; + } catch (const exception& e) { + Utils::error("Error in Worker: " + string(e.what())); + for (auto& atom : this->atoms) { + delete atom; + } + this->atoms.clear(); + } + + return false; +} + +bool AtomPersistenceJob::is_finished() const { return this->finished; } + +void AtomPersistenceJob::set_producer_finished() { this->producer_finished = true; } diff --git a/src/db_adapter/Pipeline.h b/src/db_adapter/Pipeline.h new file mode 100644 index 000000000..e77d105cf --- /dev/null +++ b/src/db_adapter/Pipeline.h @@ -0,0 +1,69 @@ +#include + +#include "AtomDBSingleton.h" +#include "DedicatedThread.h" +#include "PostgresWrapper.h" +#include "Processor.h" +#include "SharedQueue.h" + +#define BATCH_SIZE 2000 + +using namespace atomdb; +using namespace std; +using namespace commons; +using namespace atoms; +using namespace db_adapter; + +namespace db_adapter { + +class DatabaseMappingJob : public ThreadMethod { + public: + DatabaseMappingJob(const string& host, + int port, + const string& database, + const string& user, + const string& password, + MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS, + shared_ptr output_queue = nullptr); + ~DatabaseMappingJob(); + + void add_task_query(const string& virtual_name, const string& query); + void add_task_table(TableMapping table_mapping); + + bool thread_one_step() override; + + bool is_finished() const; + + protected: + struct MappingTask { + enum Type { TABLE, QUERY } type; + TableMapping table_mapping; + string virtual_name; + string query; + }; + unique_ptr db_conn; + unique_ptr wrapper; + vector tasks; + size_t current_task = 0; + bool finished = false; + bool initialized = false; +}; + +class AtomPersistenceJob : public ThreadMethod { + public: + AtomPersistenceJob(shared_ptr input_queue); + ~AtomPersistenceJob(); + + bool thread_one_step() override; + bool is_finished() const; + void set_producer_finished(); + + protected: + vector atoms; + shared_ptr input_queue; + shared_ptr atomdb; + bool finished = false; + bool producer_finished = false; +}; + +} // namespace db_adapter diff --git a/src/db_adapter/README.md b/src/db_adapter/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/db_adapter/postgres/BUILD b/src/db_adapter/postgres/BUILD new file mode 100644 index 000000000..70b020179 --- /dev/null +++ b/src/db_adapter/postgres/BUILD @@ -0,0 +1,24 @@ +load("@rules_cc//cc:cc_library.bzl", "cc_library") + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "postgres_lib", + includes = ["."], + deps = [ + ":postgres_wrapper", + ], +) + +cc_library( + name = "postgres_wrapper", + srcs = ["PostgresWrapper.cc"], + hdrs = ["PostgresWrapper.h"], + includes = ["."], + deps = [ + "//commons:commons_lib", + "//db_adapter:data_mapper", + "//db_adapter:database_connection", + "//db_adapter:database_wrapper", + ], +) diff --git a/src/db_adapter/postgres/PostgresWrapper.cc b/src/db_adapter/postgres/PostgresWrapper.cc new file mode 100644 index 000000000..47a47a5f1 --- /dev/null +++ b/src/db_adapter/postgres/PostgresWrapper.cc @@ -0,0 +1,542 @@ +#include "PostgresWrapper.h" + +#include +#include +#include +#include +#include +#include +#include + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace std; + +PostgresDatabaseConnection::PostgresDatabaseConnection(const string& id, + const string& host, + int port, + const string& database, + const string& user, + const string& password) + : DatabaseConnection(id, host, port), database(database), user(user), password(password) {} + +PostgresDatabaseConnection::~PostgresDatabaseConnection() { + this->close_cursor(); + this->disconnect(); +} + +void PostgresDatabaseConnection::connect() { + LOG_INFO("Connecting to PostgreSQL database at " << host << ":" << port << "..."); + try { + string conn_str = "host=" + host + " port=" + std::to_string(port) + " dbname=" + database; + if (!user.empty()) { + conn_str += " user=" + user; + } + if (!password.empty()) { + conn_str += " password=" + password; + } + this->conn = make_unique(conn_str); + } catch (const exception& e) { + throw runtime_error("Could not connect to database: " + string(e.what())); + } +} + +void PostgresDatabaseConnection::disconnect() { + if (this->conn) { + this->conn->close(); + this->conn.reset(); + } +} + +pqxx::result PostgresDatabaseConnection::execute_query(const string& query) { + if (!this->conn || !this->conn->is_open()) { + Utils::error("Postgres connection is not open."); + } + + try { + pqxx::work transaction(*this->conn); + pqxx::result result = transaction.exec(query); + transaction.commit(); + return result; + } catch (const exception& e) { + Utils::error("Error during query execution: " + string(e.what())); + } + return pqxx::result{}; +} + +void PostgresDatabaseConnection::begin_cursor(const string& cursor_name, const string& query) { + if (!this->conn || !this->conn->is_open()) { + Utils::error("Postgres connection is not open."); + } + if (this->transaction) { + Utils::error("A transaction is already active. Close the current cursor first."); + } + this->transaction = make_unique(*this->conn); + + try { + this->transaction->exec("DECLARE " + cursor_name + " CURSOR FOR " + query); + } catch (const exception& e) { + this->transaction.reset(); + Utils::error("Error executing cursor query: " + string(e.what())); + } +} + +pqxx::result PostgresDatabaseConnection::fetch_cursor(const string& cursor_name, size_t limit) { + if (!this->transaction) { + Utils::error("No active transaction. Call begin_cursor first."); + } + return this->transaction->exec("FETCH " + std::to_string(limit) + " FROM " + cursor_name); +} + +void PostgresDatabaseConnection::close_cursor() { + if (this->transaction) { + try { + this->transaction->commit(); + } catch (...) { + // Ignore errors during commit on close + } + this->transaction.reset(); + } +} + +// =============================================================================================== +// PostgresWrapper implementation +// =============================================================================================== + +PostgresWrapper::PostgresWrapper(PostgresDatabaseConnection& db_conn, + MAPPER_TYPE mapper_type, + shared_ptr output_queue) + : SQLWrapper(db_conn, mapper_type), db_conn(db_conn), output_queue(output_queue) {} + +PostgresWrapper::~PostgresWrapper() {} + +Table PostgresWrapper::get_table(const string& name) { + auto tables = this->list_tables(); + for (const auto& table : tables) { + if (table.name == name) return table; + } + Utils::error("Table '" + name + "' not found in the database."); +} + +vector
PostgresWrapper::list_tables() { + if (tables_cache.has_value()) { + return tables_cache.value(); + } + + string query = R"(WITH table_info AS ( + SELECT + schemaname || '.' || tablename AS table_name, + (SELECT reltuples::bigint FROM pg_class WHERE oid = (schemaname || '.' || tablename)::regclass) AS row_count, + pg_tables.schemaname, + pg_tables.tablename + FROM + pg_tables + WHERE + schemaname NOT IN ('pg_catalog', 'information_schema') + ), + column_info AS ( + SELECT + table_schema || '.' || table_name AS table_name, + string_agg(column_name, ',') AS columns + FROM + information_schema.columns + WHERE + table_schema NOT IN ('pg_catalog', 'information_schema') + GROUP BY + table_schema, table_name + ), + pk_info AS ( + SELECT + tc.table_schema || '.' || tc.table_name AS table_name, + string_agg(kcu.column_name, ',') AS pk_column + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE + tc.constraint_type = 'PRIMARY KEY' + GROUP BY + tc.table_schema, tc.table_name + ), + fk_info AS ( + SELECT + tc.table_schema || '.' || tc.table_name AS table_name, + string_agg(kcu.column_name || '|' || ccu.table_schema || '.' || ccu.table_name, ',') AS fk_columns + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + WHERE + tc.constraint_type = 'FOREIGN KEY' + GROUP BY + tc.table_schema, tc.table_name + ) + SELECT + ti.table_name, + ti.row_count, + ci.columns, + COALESCE(pk.pk_column, '') AS pk_column, + COALESCE(fk.fk_columns, '') AS fk_columns + FROM + table_info ti + LEFT JOIN + column_info ci ON ti.table_name = ci.table_name + LEFT JOIN + pk_info pk ON ti.table_name = pk.table_name + LEFT JOIN + fk_info fk ON ti.table_name = fk.table_name + ORDER BY + pg_total_relation_size(ti.table_name) ASC; + )"; + auto result = db_conn.execute_query(query); + vector
tables; + tables.reserve(result.size()); + + for (const auto& row : result) { + string table_name = row["table_name"].c_str(); + int row_count = row["row_count"].as(0); + string columns = row["columns"].c_str(); + string pk_column = row["pk_column"].c_str(); + string fk_columns = row["fk_columns"].c_str(); + + Table t; + t.name = table_name; + t.row_count = row_count; + t.primary_key = pk_column; + t.column_names = columns.empty() ? vector{} : Utils::split(columns, ','); + t.foreign_keys = fk_columns.empty() ? vector{} : Utils::split(fk_columns, ','); + + tables.push_back(move(t)); + } + this->tables_cache = tables; + return tables; +} + +void PostgresWrapper::map_table(const Table& table, + const vector& clauses, + const vector& skip_columns, + bool second_level) { + LOG_DEBUG("Mapping table: " << table.name); + + string where_clauses; + + for (size_t i = 0; i < clauses.size(); ++i) { + if (i > 0) where_clauses += " AND "; + where_clauses += clauses[i]; + } + + auto columns = build_columns_to_map(table, skip_columns); + + string cols_sql = Utils::join(columns, ','); + string base_query = "SELECT " + cols_sql + " FROM " + table.name; + if (!where_clauses.empty()) { + base_query += " WHERE " + where_clauses; + } + LOG_DEBUG("Base query: " << base_query); + this->fetch_rows_paginated(table, columns, base_query); + + if (second_level) { + for (const auto& fk : table.foreign_keys) { + auto parts = Utils::split(fk, '|'); + + if (parts.size() != 2) { + Utils::error("Invalid foreign key format: " + fk); + } + + string column = parts[0]; + string ref_table_name = parts[1]; + + // Collect distinct non-null foreign-key values + auto fk_ids = this->collect_fk_ids(table.name, column, where_clauses); + + if (fk_ids.empty()) continue; + + auto ref_table = this->get_table(ref_table_name); + auto ref_columns = this->build_columns_to_map(ref_table); + + string where_clause = ref_table.primary_key + " IN " + "(" + Utils::join(fk_ids, ',') + ")"; + + string cols_sql = Utils::join(ref_columns, ','); + string base_query = "SELECT " + cols_sql + " FROM " + ref_table.name; + if (!where_clause.empty()) { + base_query += " WHERE " + where_clause; + } + + this->fetch_rows_paginated(ref_table, ref_columns, base_query); + } + } +} + +vector PostgresWrapper::build_columns_to_map(const Table& table, + const vector& skip_columns) { + for (const auto& skipo_col : skip_columns) { + if (find(table.column_names.begin(), table.column_names.end(), skipo_col) == + table.column_names.end()) { + Utils::error("Skip column '" + skipo_col + "' not found in table '" + table.name + "'"); + } + } + + vector columns_to_process = table.column_names; + + vector non_primary_key_columns; + for (const auto& col : columns_to_process) { + if (col != table.primary_key) non_primary_key_columns.push_back(col); + } + + if (!skip_columns.empty()) { + vector filtered_columns; + for (const auto& col : non_primary_key_columns) { + if (find(skip_columns.begin(), skip_columns.end(), col) == skip_columns.end()) { + filtered_columns.push_back(col); + } + } + non_primary_key_columns = move(filtered_columns); + } + + vector columns_list; + columns_list.push_back(table.primary_key); + columns_list.insert( + columns_list.end(), non_primary_key_columns.begin(), non_primary_key_columns.end()); + + vector final_columns; + for (const auto& item : columns_list) { + if (!item.empty()) final_columns.push_back(item); + } + + return final_columns; +} + +vector PostgresWrapper::collect_fk_ids(const string& table_name, + const string& column_name, + const string& where_clause) { + vector ids; + + size_t offset = 0; + size_t limit = 10000; + + while (true) { + string query = "SELECT " + column_name + " FROM " + table_name + " WHERE " + where_clause + + " LIMIT " + std::to_string(limit) + " OFFSET " + std::to_string(offset) + ";"; + pqxx::result rows = db_conn.execute_query(query); + + if (rows.empty()) break; + + for (const pqxx::row& row : rows) { + auto field = row[0]; + if (!field.is_null()) { + string value = field.c_str(); + ids.push_back(value); + } + } + + offset += limit; + } + return ids; +} + +void PostgresWrapper::map_sql_query(const string& virtual_name, const string& raw_query) { + map> table_columns_map = this->extract_aliases_from_query(raw_query); + + if (table_columns_map.empty()) { + Utils::error("No valid aliases found in query for " + virtual_name); + } + + map tables_metadata; + + // Search metadata (PK, FK, ...) of each referenced table + // and validate that each table has its PK included in the aliases + for (const auto& table_columns : table_columns_map) { + string table_name = table_columns.first; + vector columns = table_columns.second; + try { + tables_metadata[table_name] = this->get_table(table_name); + string pk = tables_metadata[table_name].primary_key; + if (find(columns.begin(), columns.end(), pk) == columns.end()) { + auto parts = Utils::split(table_name, '.'); + string schema = parts[0]; + string table = parts[1]; + Utils::error("Primary key '" + pk + "' of table '" + table_name + + "' must be included in SELECT aliases. Add: " + table + "." + pk + " AS " + + schema + "_" + table + "__" + pk); + } + } catch (const exception& e) { + Utils::error("Error retrieving metadata for table '" + table_name + "': " + e.what()); + } + } + + string base_query = Utils::trim(raw_query); + + if (!base_query.empty() && base_query.back() == ';') base_query.pop_back(); + + for (const auto& table_columns : table_columns_map) { + string table_name = table_columns.first; + vector columns = table_columns.second; + Table table = tables_metadata[table_name]; + this->fetch_rows_paginated(table, columns, base_query); + } +} + +map> PostgresWrapper::extract_aliases_from_query(const string& query) { + map> tables; + + regex alias_pattern(alias_pattern_regex, regex_constants::icase); + + auto matches_begin = sregex_iterator(query.begin(), query.end(), alias_pattern); + auto matches_end = sregex_iterator(); + + for (auto it = matches_begin; it != matches_end; ++it) { + smatch match = *it; + string table_part = match[1].str(); + string column_name = match[2].str(); + + string table_name; + + size_t dot_pos = table_part.find('.'); + if (dot_pos != string::npos) { + table_name = table_part; + } else { + size_t underscore_pos = table_part.find('_'); + if (underscore_pos != string::npos) { + table_name = + table_part.substr(0, underscore_pos) + "." + table_part.substr(underscore_pos + 1); + } else { + table_name = "public." + table_part; + } + } + + auto& columns = tables[table_name]; + if (find(columns.begin(), columns.end(), column_name) == columns.end()) { + columns.push_back(column_name); + } + } + + return tables; +} + +void PostgresWrapper::fetch_rows_paginated(const Table& table, + const vector& columns, + const string& query) { + LOG_INFO("[START] Mapping table " << table.name); + + size_t limit = 10000; + int rows_count = 0; + int atoms_count = 0; + + string table_name = table.name; + Utils::replace_all(table_name, ".", "_"); + string cursor_name = "cursor_" + table_name; + + this->db_conn.begin_cursor(cursor_name, query); + + while (true) { + pqxx::result rows = this->db_conn.fetch_cursor(cursor_name, limit); + + LOG_DEBUG("Fetched " << rows.size() << " rows from table " << table.name); + + if (rows.empty()) break; + + while (this->get_available_ram_ratio() < 0.2) { + LOG_INFO("Low available RAM. Waiting before adding more atoms to the queue..."); + Utils::sleep(5000); + } + + for (const auto& row : rows) { + SqlRow sql_row = this->build_sql_row(row, table, columns); + + LOG_DEBUG("Built SqlRow for table " + << table.name << " with primary key: " + << (sql_row.primary_key ? sql_row.primary_key->value : "NULL")); + for (const auto& field : sql_row.fields) { + LOG_DEBUG(" Field: " << field.name << " = " << field.value); + } + + auto output = this->mapper->map(DbInput{sql_row}); + + if (this->mapper_type == MAPPER_TYPE::SQL2ATOMS) { + auto atoms = get>(output); + LOG_DEBUG("Atoms count: " << atoms.size()); + atoms_count += atoms.size(); + unique_lock lock(this->api_mutex); + for (const auto& atom : atoms) { + this->output_queue->enqueue((void*) atom); + this->count++; + } + } else { + auto metta_expressions = get>(output); + LOG_DEBUG("Metta Expressions count: " << metta_expressions.size()); + // WIP - save metta expressions to file + } + + LOG_DEBUG("Mapper HandleTrie size: " << this->mapper->handle_trie_size()); + + rows_count++; + + this->log_progress(table.name, rows_count); + } + LOG_DEBUG("Added " << this->count << " atoms in the queue"); + LOG_DEBUG("Atoms in queue: " << to_string(this->output_queue->size())); + LOG_DEBUG("Mapping table " << table.name << ". Total atoms generated: " << atoms_count); + } + + this->db_conn.close_cursor(); + + LOG_INFO("[END] Mapping table " << table.name << ". Total atoms generated: " << atoms_count); + LOG_DEBUG("[END] Mapper HandleTrie size: " << this->mapper->handle_trie_size()); +} + +SqlRow PostgresWrapper::build_sql_row(const pqxx::row& row, const Table& table, vector columns) { + SqlRow sql_row; + sql_row.table_name = table.name; + sql_row.primary_key = ColumnValue{columns[0], row[0].c_str()}; + + for (size_t i = 1; i < columns.size() && i < row.size(); i++) { + string col = columns[i]; + auto field = row[i]; + + if (field.is_null()) continue; + + string value = field.c_str(); + + // datetime → SKIP + // YYYY-MM-DD HH:MM:SS... + if (value.size() >= 19 && value[4] == '-' && value[7] == '-') continue; + + if (value.empty()) { + continue; + } else if (value.size() > MAX_VALUE_SIZE) { + continue; + } + + Utils::replace_all(value, "\n", " "); + + string column_name = col; + for (const auto& fk : table.foreign_keys) { + if (fk.find(col) != string::npos) { + column_name = fk; + break; + } + } + sql_row.add_field(column_name, value); + } + return sql_row; +} + +void PostgresWrapper::log_progress(const string& table_name, int rows_count) { + int last_logged_count = this->tables_rows_count[table_name]; + + if (rows_count - last_logged_count >= 10000) { + LOG_INFO("Mapped " << rows_count << " rows from the " << table_name << " table"); + this->tables_rows_count[table_name] = rows_count; + } +} + +double PostgresWrapper::get_available_ram_ratio() { + unsigned long total = Utils::get_total_ram(); + if (total == 0) return 0.0; + return static_cast(Utils::get_current_free_ram()) / static_cast(total); +} diff --git a/src/db_adapter/postgres/PostgresWrapper.h b/src/db_adapter/postgres/PostgresWrapper.h new file mode 100644 index 000000000..620440d54 --- /dev/null +++ b/src/db_adapter/postgres/PostgresWrapper.h @@ -0,0 +1,99 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "DatabaseConnection.h" +#include "DatabaseWrapper.h" +#include "SharedQueue.h" + +#define MAX_VALUE_SIZE ((size_t) 1000) + +using namespace std; +using namespace atoms; +using namespace commons; + +namespace db_adapter { + +class PostgresDatabaseConnection : public DatabaseConnection { + public: + PostgresDatabaseConnection(const string& id, + const string& host, + int port, + const string& database, + const string& user, + const string& password); + ~PostgresDatabaseConnection(); + + void connect() override; + void disconnect() override; + pqxx::result execute_query(const string& query); + void begin_cursor(const string& cursor_name, const string& query); + pqxx::result fetch_cursor(const string& cursor_name, size_t limit); + void close_cursor(); + + protected: + unique_ptr conn; + unique_ptr transaction; + string database; + string user; + string password; +}; + +/** + * @class PostgresWrapper + * @brief Concrete implementation of SQLWrapper for PostgreSQL using libpqxx. + */ +class PostgresWrapper : public SQLWrapper { + public: + /** + * @brief Constructs a PostgresWrapper. + * + * @param connection The PostgreSQL database connection. + * @param mapper_type The strategy for mapping results. + */ + PostgresWrapper(PostgresDatabaseConnection& db_conn, + MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS, + shared_ptr output_queue = nullptr); + + ~PostgresWrapper() override; + + Table get_table(const string& name) override; + vector
list_tables() override; + void map_table(const Table& table, + const vector& clauses, + const vector& skip_columns = {}, + bool second_level = false) override; + void map_sql_query(const string& virtual_name, const string& raw_query) override; + + protected: + // Regex for parsing alias patterns (e.g., "AS public_feature__uniquename") + const string alias_pattern_regex = R"(\bAS\s+([a-zA-Z_][a-zA-Z0-9_]*)__([a-zA-Z_][a-zA-Z0-9_]*))"; + + private: + atomic count = 0; + mutex api_mutex; + PostgresDatabaseConnection& db_conn; + shared_ptr output_queue; + optional> tables_cache; + unordered_map tables_rows_count; + vector build_columns_to_map(const Table& table, const vector& skip_columns = {}); + vector collect_fk_ids(const string& table_name, + const string& column_name, + const string& where_clause = ""); + map> extract_aliases_from_query(const string& query); + void fetch_rows_paginated(const Table& table, const vector& columns, const string& query); + SqlRow build_sql_row(const pqxx::row& row, const Table& table, vector columns); + void log_progress(const string& table_name, int rows_count); + double get_available_ram_ratio(); +}; + +} // namespace db_adapter \ No newline at end of file diff --git a/src/docker/Dockerfile b/src/docker/Dockerfile index 378b40b99..21f4dc3ac 100644 --- a/src/docker/Dockerfile +++ b/src/docker/Dockerfile @@ -20,7 +20,7 @@ RUN mkdir -p \ RUN yes | apt update -y \ && yes | apt install -y git build-essential curl protobuf-compiler python3 python3-pip cmake unzip uuid-runtime lcov rpm bc \ - libevent-dev libssl-dev pkg-config libncurses5 \ + libevent-dev libssl-dev pkg-config libncurses5 libpq-dev \ && yes | apt clean -y \ && rm -rf /var/lib/apt/lists/* @@ -62,6 +62,16 @@ RUN cd /tmp \ && ln -s /usr/local/include/mongocxx/v_noabi/mongocxx/* /usr/local/include/mongocxx/ \ && ldconfig +# Postgres client (libpqxx) +COPY src/assets/libpqxx-7.10.5.tar.gz /tmp +RUN cd /tmp \ + && tar xzvf libpqxx-7.10.5.tar.gz \ + && cd /tmp/libpqxx-7.10.5 \ + && cmake -S . -B build -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + && cmake --build build -j \ + && cmake --install build \ + && ldconfig + # cpp-httplib RUN cd /tmp \ && unzip -q cpp-httplib-master.zip \ diff --git a/src/package/BUILD b/src/package/BUILD index 3f42582de..3fa30cae2 100644 --- a/src/package/BUILD +++ b/src/package/BUILD @@ -12,6 +12,7 @@ pkg_files( "//:attention_broker_service", "//:busclient", "//:busnode", + "//:database_adapter", "//:implication_query_evolution", "//:tests_db_loader", "//:word_query", @@ -64,7 +65,7 @@ pkg_rpm( "@platforms//os:linux": [], "//conditions:default": ["@platforms//:incompatible"], }), - version = "1.0.3", + version = "1", ) # DEB Packaging Setup @@ -76,6 +77,7 @@ pkg_tar( "//:attention_broker_service", "//:busclient", "//:busnode", + "//:database_adapter", "//:implication_query_evolution", "//:tests_db_loader", "//:word_query", @@ -117,5 +119,5 @@ pkg_deb( description = "Distributed AtomSpace Binaries and Libraries", maintainer = "Hyperon DAS Team", package = "das", - version = "1.0.3", + version = "1", ) diff --git a/src/scripts/bazel_build.sh b/src/scripts/bazel_build.sh index 06efc0443..ee700df86 100755 --- a/src/scripts/bazel_build.sh +++ b/src/scripts/bazel_build.sh @@ -43,6 +43,7 @@ if [ "$BUILD_BINARIES" = true ]; then BUILD_TARGETS+=" //:implication_query_evolution" BUILD_TARGETS+=" //:evaluation_evolution" BUILD_TARGETS+=" //:tests_db_loader" + BUILD_TARGETS+=" //:database_adapter" # Move targets MOVE_LIB_TARGETS+=" bazel-bin/hyperon_das.so" @@ -57,7 +58,7 @@ if [ "$BUILD_BINARIES" = true ]; then MOVE_BIN_TARGETS+=" bazel-bin/implication_query_evolution" MOVE_BIN_TARGETS+=" bazel-bin/evaluation_evolution" MOVE_BIN_TARGETS+=" bazel-bin/tests_db_loader" - + MOVE_BIN_TARGETS+=" bazel-bin/database_adapter" fi @@ -85,5 +86,19 @@ find "$LIB_DIR" -type f -name "*.so" | while IFS= read -r sofile; do done done +# Add exclusive block just to handle database_adapter SO dependencies. + +find "$BIN_DIR" -type f -executable -name "database_adapter" | while IFS= read -r binfile; do + ldd "$binfile" | awk '/=> \// { print $3 }' | while IFS= read -r dep; do + dep_base=$(basename "$dep") + case "$dep_base" in + "libpq.so.5") + if [ -f "$dep" ]; then + cp -f "$dep" "$LIB_DIR" + fi + ;; + esac + done +done exit $? diff --git a/src/scripts/bazel_package.sh b/src/scripts/bazel_package.sh index 1ada21277..771be14b1 100755 --- a/src/scripts/bazel_package.sh +++ b/src/scripts/bazel_package.sh @@ -37,6 +37,7 @@ if [ "$BUILD_BINARIES" = true ]; then BUILD_TARGETS+=" //:word_query_evolution" BUILD_TARGETS+=" //:implication_query_evolution" BUILD_TARGETS+=" //:tests_db_loader" + BUILD_TARGETS+=" //:database_adapter" fi $BAZELISK_BUILD_CMD $BUILD_TARGETS @@ -56,16 +57,33 @@ find "$BUILT_TARGETS_PATH" -type f -name "*.so" | while IFS= read -r sofile; do done done +find "$BUILT_TARGETS_PATH" -type f -executable -name "database_adapter" | while IFS= read -r binfile; do + ldd "$binfile" | awk '/=> \// { print $3 }' | while IFS= read -r dep; do + dep_base=$(basename "$dep") + case "$dep_base" in + "libpq.so.5") + if [ -f "$dep" ]; then + cp -f "$dep" "$EXTERNAL_LIBS_PATH" + fi + ;; + esac + done +done + if [[ "$PACKAGE_TYPE" == "deb" ]]; then BUILD_TARGETS=" //package:das_deb_package" $BAZELISK_BUILD_CMD $BUILD_TARGETS - cp -L bazel-bin/package/das_1.0.3_amd64.deb $PKG_DIR + + LATEST_DEB=$(ls -t bazel-bin/package/*.deb | head -n 1) + cp -L "$LATEST_DEB" "$PKG_DIR/das_package.deb" elif [[ "$PACKAGE_TYPE" == "rpm" ]]; then BUILD_TARGETS=" //package:das_rpm_package" $BAZELISK_BUILD_CMD $BUILD_TARGETS - cp -L bazel-bin/package/das-1.0.3-1.x86_64.rpm $PKG_DIR + + LATEST_RPM=$(ls -t bazel-bin/package/*.rpm | head -n 1) + cp -L "$LATEST_RPM" "$PKG_DIR/das_package.rpm" fi -rm -rf $EXTERNAL_LIBS_PATH +rm -rf "$EXTERNAL_LIBS_PATH" exit $? \ No newline at end of file diff --git a/src/scripts/postgres_setup.sql b/src/scripts/postgres_setup.sql new file mode 100644 index 000000000..7a9470401 --- /dev/null +++ b/src/scripts/postgres_setup.sql @@ -0,0 +1,197 @@ +-- ============================================================================= +-- Test Database Schema for PostgresWrapper Tests +-- ============================================================================= +-- +-- This creates 3 tables: +-- 1. organism - Referenced by feature (FK target) +-- 2. cvterm - Referenced by feature (FK target) - "controlled vocabulary term" +-- 3. feature - Main table with FKs to organism and cvterm +-- +-- To use: +-- 1. Create database: createdb postgres_wrapper_test +-- 2. Load schema: psql -d postgres_wrapper_test -f test_schema.sql +-- +-- Or with Docker: +-- docker run -d --name pg_test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=postgres_wrapper_test -p 5433:5432 postgres:15 +-- docker exec -i pg_test psql -U postgres -d postgres_wrapper_test < test_schema.sql +-- ============================================================================= + +-- Drop tables if they exist (for re-running) +DROP TABLE IF EXISTS public.feature CASCADE; +DROP TABLE IF EXISTS public.organism CASCADE; +DROP TABLE IF EXISTS public.cvterm CASCADE; + +-- ============================================================================= +-- Table 1: organism +-- ============================================================================= +CREATE TABLE public.organism ( + organism_id SERIAL PRIMARY KEY, + genus VARCHAR(255) NOT NULL, + species VARCHAR(255) NOT NULL, + common_name VARCHAR(255), + abbreviation VARCHAR(255), + comment TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.organism IS 'Stores organism/species information'; + +-- ============================================================================= +-- Table 2: cvterm (Controlled Vocabulary Term) +-- ============================================================================= +CREATE TABLE public.cvterm ( + cvterm_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + definition TEXT, + is_obsolete BOOLEAN DEFAULT FALSE, + is_relationshiptype BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.cvterm IS 'Stores controlled vocabulary terms (types)'; + +-- ============================================================================= +-- Table 3: feature (Main table with Foreign Keys) +-- ============================================================================= +CREATE TABLE public.feature ( + feature_id SERIAL PRIMARY KEY, + organism_id INTEGER NOT NULL REFERENCES public.organism(organism_id), + type_id INTEGER NOT NULL REFERENCES public.cvterm(cvterm_id), + name VARCHAR(255), + uniquename VARCHAR(255) NOT NULL, + residues TEXT, + seqlen INTEGER, + md5checksum VARCHAR(32), + is_analysis BOOLEAN DEFAULT FALSE, + is_obsolete BOOLEAN DEFAULT FALSE, + timeaccessioned TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + timelastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.feature IS 'Main feature table with FKs to organism and cvterm'; + +-- Create indexes for foreign keys +CREATE INDEX idx_feature_organism_id ON public.feature(organism_id); +CREATE INDEX idx_feature_type_id ON public.feature(type_id); +CREATE INDEX idx_feature_uniquename ON public.feature(uniquename); + +-- ============================================================================= +-- Insert Test Data: Organisms +-- ============================================================================= +INSERT INTO public.organism (organism_id, genus, species, common_name, abbreviation, comment) VALUES + (1, 'Drosophila', 'melanogaster', 'fruit fly', 'Dmel', 'Model organism for genetics'), + (2, 'Homo', 'sapiens', 'human', 'Hsap', 'Human species'), + (3, 'Mus', 'musculus', 'mouse', 'Mmus', 'Laboratory mouse'), + (4, 'Saccharomyces', 'cerevisiae', 'yeast', 'Scer', 'Baker''s yeast'), + (5, 'Caenorhabditis', 'elegans', 'roundworm', 'Cele', 'Model organism for development'); + +-- ============================================================================= +-- Insert Test Data: CV Terms (Feature Types) +-- ============================================================================= +INSERT INTO public.cvterm (cvterm_id, name, definition, is_obsolete, is_relationshiptype) VALUES + (1, 'gene', 'A region of biological sequence that encodes a gene', FALSE, FALSE), + (2, 'mRNA', 'Messenger RNA', FALSE, FALSE), + (3, 'exon', 'A region of the transcript that is included in the mature RNA', FALSE, FALSE), + (4, 'intron', 'A region of the transcript that is not included in the mature RNA', FALSE, FALSE), + (5, 'protein', 'A sequence of amino acids', FALSE, FALSE), + (6, 'chromosome', 'A structure that contains genetic material', FALSE, FALSE), + (7, 'promoter', 'A regulatory region', FALSE, FALSE), + (8, 'obsolete_term', 'This term is obsolete', TRUE, FALSE), + (9, 'part_of', 'Relationship type: part of', FALSE, TRUE), + (10, 'derives_from', 'Relationship type: derives from', FALSE, TRUE); + +-- ============================================================================= +-- Insert Test Data: Features +-- ============================================================================= + +-- Drosophila melanogaster features (organism_id = 1) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (1, 1, 1, 'white', 'FBgn0003996', 'ATGCGATCGATCG', 13, FALSE, FALSE), + (2, 1, 1, 'yellow', 'FBgn0004034', 'GCTAGCTAGCTAG', 13, FALSE, FALSE), + (3, 1, 1, 'vermilion', 'FBgn0003979', NULL, NULL, FALSE, FALSE), + (4, 1, 2, 'white-RA', 'FBtr0070001', 'ATGCGATCG', 9, FALSE, FALSE), + (5, 1, 2, 'white-RB', 'FBtr0070002', 'ATGCGA', 6, FALSE, FALSE), + (6, 1, 3, 'white:1', 'FBexon0001', 'ATG', 3, FALSE, FALSE), + (7, 1, 3, 'white:2', 'FBexon0002', 'CGA', 3, FALSE, FALSE), + (8, 1, 5, 'white-PA', 'FBpp0070001', 'MVLSPADKTNVKAAWGKVGAHAGEYGAEALERMFLSFPTTKTYFPHFDLSH', 51, FALSE, FALSE), + (9, 1, 6, 'chr2L', 'FBchr0000001', NULL, 23513712, FALSE, FALSE), + (10, 1, 1, 'obsolete_gene', 'FBgn9999999', NULL, NULL, FALSE, TRUE); + +-- Human features (organism_id = 2) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (11, 2, 1, 'BRCA1', 'ENSG00000012048', 'ATGCGATCGATCGATCG', 17, FALSE, FALSE), + (12, 2, 1, 'TP53', 'ENSG00000141510', 'GCTAGCTAGCTAGCTAG', 17, FALSE, FALSE), + (13, 2, 2, 'BRCA1-201', 'ENST00000357654', 'ATGCGATCG', 9, FALSE, FALSE), + (14, 2, 5, 'BRCA1-P01', 'ENSP00000350283', 'MDLSALRVEEVQNVINAMQKILECPI', 26, FALSE, FALSE); + +-- Mouse features (organism_id = 3) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (15, 3, 1, 'Brca1', 'MGI:104537', 'ATGCGATCG', 9, FALSE, FALSE), + (16, 3, 1, 'Trp53', 'MGI:98834', 'GCTAGCTAG', 9, FALSE, FALSE); + +-- Yeast features (organism_id = 4) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (17, 4, 1, 'GAL4', 'YPL248C', 'ATGCGA', 6, FALSE, FALSE), + (18, 4, 1, 'ACT1', 'YFL039C', 'GCTAGC', 6, FALSE, FALSE); + +-- C. elegans features (organism_id = 5) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (19, 5, 1, 'unc-54', 'WBGene00006789', 'ATGCGATCGATCG', 13, FALSE, FALSE), + (20, 5, 1, 'dpy-10', 'WBGene00001072', 'GCTAGCTAGCTAG', 13, FALSE, FALSE); + +-- ============================================================================= +-- Special Test Cases +-- ============================================================================= + +-- Feature with NULL name (for testing NULL handling) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (21, 1, 1, NULL, 'FBgn_null_name'); + +-- Feature with empty string name (for testing empty string handling) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (22, 1, 1, '', 'FBgn_empty_name'); + +-- Feature with very long residues (> MAX_VALUE_SIZE = 1000) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen) VALUES + (23, 1, 1, 'long_residues_gene', 'FBgn_long', REPEAT('ATGC', 300), 1200); + +-- Feature with special characters in name +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (24, 1, 1, 'gene''s "special" ', 'FBgn_special_chars'); + +-- Feature with unicode in name +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (25, 1, 1, 'gène_français_日本語', 'FBgn_unicode'); + +-- Feature with analysis flag +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, is_analysis) VALUES + (26, 1, 1, 'computed_gene', 'FBgn_analysis', TRUE); + +-- ============================================================================= +-- Reset sequences to continue after manual IDs +-- ============================================================================= +SELECT setval('public.organism_organism_id_seq', (SELECT MAX(organism_id) FROM public.organism)); +SELECT setval('public.cvterm_cvterm_id_seq', (SELECT MAX(cvterm_id) FROM public.cvterm)); +SELECT setval('public.feature_feature_id_seq', (SELECT MAX(feature_id) FROM public.feature)); + +-- ============================================================================= +-- Verify data loaded correctly +-- ============================================================================= +DO $$ +DECLARE + org_count INTEGER; + cvterm_count INTEGER; + feature_count INTEGER; +BEGIN + SELECT COUNT(*) INTO org_count FROM public.organism; + SELECT COUNT(*) INTO cvterm_count FROM public.cvterm; + SELECT COUNT(*) INTO feature_count FROM public.feature; + + RAISE NOTICE '==========================================='; + RAISE NOTICE 'Test database loaded successfully!'; + RAISE NOTICE '==========================================='; + RAISE NOTICE 'Organisms: % rows', org_count; + RAISE NOTICE 'CV Terms: % rows', cvterm_count; + RAISE NOTICE 'Features: % rows', feature_count; + RAISE NOTICE '==========================================='; +END $$; \ No newline at end of file diff --git a/src/tests/assets/postgres_schema.sql b/src/tests/assets/postgres_schema.sql new file mode 100644 index 000000000..7a9470401 --- /dev/null +++ b/src/tests/assets/postgres_schema.sql @@ -0,0 +1,197 @@ +-- ============================================================================= +-- Test Database Schema for PostgresWrapper Tests +-- ============================================================================= +-- +-- This creates 3 tables: +-- 1. organism - Referenced by feature (FK target) +-- 2. cvterm - Referenced by feature (FK target) - "controlled vocabulary term" +-- 3. feature - Main table with FKs to organism and cvterm +-- +-- To use: +-- 1. Create database: createdb postgres_wrapper_test +-- 2. Load schema: psql -d postgres_wrapper_test -f test_schema.sql +-- +-- Or with Docker: +-- docker run -d --name pg_test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=postgres_wrapper_test -p 5433:5432 postgres:15 +-- docker exec -i pg_test psql -U postgres -d postgres_wrapper_test < test_schema.sql +-- ============================================================================= + +-- Drop tables if they exist (for re-running) +DROP TABLE IF EXISTS public.feature CASCADE; +DROP TABLE IF EXISTS public.organism CASCADE; +DROP TABLE IF EXISTS public.cvterm CASCADE; + +-- ============================================================================= +-- Table 1: organism +-- ============================================================================= +CREATE TABLE public.organism ( + organism_id SERIAL PRIMARY KEY, + genus VARCHAR(255) NOT NULL, + species VARCHAR(255) NOT NULL, + common_name VARCHAR(255), + abbreviation VARCHAR(255), + comment TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.organism IS 'Stores organism/species information'; + +-- ============================================================================= +-- Table 2: cvterm (Controlled Vocabulary Term) +-- ============================================================================= +CREATE TABLE public.cvterm ( + cvterm_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + definition TEXT, + is_obsolete BOOLEAN DEFAULT FALSE, + is_relationshiptype BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.cvterm IS 'Stores controlled vocabulary terms (types)'; + +-- ============================================================================= +-- Table 3: feature (Main table with Foreign Keys) +-- ============================================================================= +CREATE TABLE public.feature ( + feature_id SERIAL PRIMARY KEY, + organism_id INTEGER NOT NULL REFERENCES public.organism(organism_id), + type_id INTEGER NOT NULL REFERENCES public.cvterm(cvterm_id), + name VARCHAR(255), + uniquename VARCHAR(255) NOT NULL, + residues TEXT, + seqlen INTEGER, + md5checksum VARCHAR(32), + is_analysis BOOLEAN DEFAULT FALSE, + is_obsolete BOOLEAN DEFAULT FALSE, + timeaccessioned TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + timelastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE public.feature IS 'Main feature table with FKs to organism and cvterm'; + +-- Create indexes for foreign keys +CREATE INDEX idx_feature_organism_id ON public.feature(organism_id); +CREATE INDEX idx_feature_type_id ON public.feature(type_id); +CREATE INDEX idx_feature_uniquename ON public.feature(uniquename); + +-- ============================================================================= +-- Insert Test Data: Organisms +-- ============================================================================= +INSERT INTO public.organism (organism_id, genus, species, common_name, abbreviation, comment) VALUES + (1, 'Drosophila', 'melanogaster', 'fruit fly', 'Dmel', 'Model organism for genetics'), + (2, 'Homo', 'sapiens', 'human', 'Hsap', 'Human species'), + (3, 'Mus', 'musculus', 'mouse', 'Mmus', 'Laboratory mouse'), + (4, 'Saccharomyces', 'cerevisiae', 'yeast', 'Scer', 'Baker''s yeast'), + (5, 'Caenorhabditis', 'elegans', 'roundworm', 'Cele', 'Model organism for development'); + +-- ============================================================================= +-- Insert Test Data: CV Terms (Feature Types) +-- ============================================================================= +INSERT INTO public.cvterm (cvterm_id, name, definition, is_obsolete, is_relationshiptype) VALUES + (1, 'gene', 'A region of biological sequence that encodes a gene', FALSE, FALSE), + (2, 'mRNA', 'Messenger RNA', FALSE, FALSE), + (3, 'exon', 'A region of the transcript that is included in the mature RNA', FALSE, FALSE), + (4, 'intron', 'A region of the transcript that is not included in the mature RNA', FALSE, FALSE), + (5, 'protein', 'A sequence of amino acids', FALSE, FALSE), + (6, 'chromosome', 'A structure that contains genetic material', FALSE, FALSE), + (7, 'promoter', 'A regulatory region', FALSE, FALSE), + (8, 'obsolete_term', 'This term is obsolete', TRUE, FALSE), + (9, 'part_of', 'Relationship type: part of', FALSE, TRUE), + (10, 'derives_from', 'Relationship type: derives from', FALSE, TRUE); + +-- ============================================================================= +-- Insert Test Data: Features +-- ============================================================================= + +-- Drosophila melanogaster features (organism_id = 1) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (1, 1, 1, 'white', 'FBgn0003996', 'ATGCGATCGATCG', 13, FALSE, FALSE), + (2, 1, 1, 'yellow', 'FBgn0004034', 'GCTAGCTAGCTAG', 13, FALSE, FALSE), + (3, 1, 1, 'vermilion', 'FBgn0003979', NULL, NULL, FALSE, FALSE), + (4, 1, 2, 'white-RA', 'FBtr0070001', 'ATGCGATCG', 9, FALSE, FALSE), + (5, 1, 2, 'white-RB', 'FBtr0070002', 'ATGCGA', 6, FALSE, FALSE), + (6, 1, 3, 'white:1', 'FBexon0001', 'ATG', 3, FALSE, FALSE), + (7, 1, 3, 'white:2', 'FBexon0002', 'CGA', 3, FALSE, FALSE), + (8, 1, 5, 'white-PA', 'FBpp0070001', 'MVLSPADKTNVKAAWGKVGAHAGEYGAEALERMFLSFPTTKTYFPHFDLSH', 51, FALSE, FALSE), + (9, 1, 6, 'chr2L', 'FBchr0000001', NULL, 23513712, FALSE, FALSE), + (10, 1, 1, 'obsolete_gene', 'FBgn9999999', NULL, NULL, FALSE, TRUE); + +-- Human features (organism_id = 2) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (11, 2, 1, 'BRCA1', 'ENSG00000012048', 'ATGCGATCGATCGATCG', 17, FALSE, FALSE), + (12, 2, 1, 'TP53', 'ENSG00000141510', 'GCTAGCTAGCTAGCTAG', 17, FALSE, FALSE), + (13, 2, 2, 'BRCA1-201', 'ENST00000357654', 'ATGCGATCG', 9, FALSE, FALSE), + (14, 2, 5, 'BRCA1-P01', 'ENSP00000350283', 'MDLSALRVEEVQNVINAMQKILECPI', 26, FALSE, FALSE); + +-- Mouse features (organism_id = 3) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (15, 3, 1, 'Brca1', 'MGI:104537', 'ATGCGATCG', 9, FALSE, FALSE), + (16, 3, 1, 'Trp53', 'MGI:98834', 'GCTAGCTAG', 9, FALSE, FALSE); + +-- Yeast features (organism_id = 4) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (17, 4, 1, 'GAL4', 'YPL248C', 'ATGCGA', 6, FALSE, FALSE), + (18, 4, 1, 'ACT1', 'YFL039C', 'GCTAGC', 6, FALSE, FALSE); + +-- C. elegans features (organism_id = 5) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen, is_analysis, is_obsolete) VALUES + (19, 5, 1, 'unc-54', 'WBGene00006789', 'ATGCGATCGATCG', 13, FALSE, FALSE), + (20, 5, 1, 'dpy-10', 'WBGene00001072', 'GCTAGCTAGCTAG', 13, FALSE, FALSE); + +-- ============================================================================= +-- Special Test Cases +-- ============================================================================= + +-- Feature with NULL name (for testing NULL handling) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (21, 1, 1, NULL, 'FBgn_null_name'); + +-- Feature with empty string name (for testing empty string handling) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (22, 1, 1, '', 'FBgn_empty_name'); + +-- Feature with very long residues (> MAX_VALUE_SIZE = 1000) +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, residues, seqlen) VALUES + (23, 1, 1, 'long_residues_gene', 'FBgn_long', REPEAT('ATGC', 300), 1200); + +-- Feature with special characters in name +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (24, 1, 1, 'gene''s "special" ', 'FBgn_special_chars'); + +-- Feature with unicode in name +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename) VALUES + (25, 1, 1, 'gène_français_日本語', 'FBgn_unicode'); + +-- Feature with analysis flag +INSERT INTO public.feature (feature_id, organism_id, type_id, name, uniquename, is_analysis) VALUES + (26, 1, 1, 'computed_gene', 'FBgn_analysis', TRUE); + +-- ============================================================================= +-- Reset sequences to continue after manual IDs +-- ============================================================================= +SELECT setval('public.organism_organism_id_seq', (SELECT MAX(organism_id) FROM public.organism)); +SELECT setval('public.cvterm_cvterm_id_seq', (SELECT MAX(cvterm_id) FROM public.cvterm)); +SELECT setval('public.feature_feature_id_seq', (SELECT MAX(feature_id) FROM public.feature)); + +-- ============================================================================= +-- Verify data loaded correctly +-- ============================================================================= +DO $$ +DECLARE + org_count INTEGER; + cvterm_count INTEGER; + feature_count INTEGER; +BEGIN + SELECT COUNT(*) INTO org_count FROM public.organism; + SELECT COUNT(*) INTO cvterm_count FROM public.cvterm; + SELECT COUNT(*) INTO feature_count FROM public.feature; + + RAISE NOTICE '==========================================='; + RAISE NOTICE 'Test database loaded successfully!'; + RAISE NOTICE '==========================================='; + RAISE NOTICE 'Organisms: % rows', org_count; + RAISE NOTICE 'CV Terms: % rows', cvterm_count; + RAISE NOTICE 'Features: % rows', feature_count; + RAISE NOTICE '==========================================='; +END $$; \ No newline at end of file diff --git a/src/tests/cpp/BUILD b/src/tests/cpp/BUILD index af0791d56..11af8970c 100644 --- a/src/tests/cpp/BUILD +++ b/src/tests/cpp/BUILD @@ -834,3 +834,31 @@ cc_test( "@com_github_google_googletest//:gtest_main", ], ) + +cc_test( + name = "db_adapter_test", + size = "small", + srcs = [ + "db_adapter_test.cc", + ], + copts = [ + "-Iexternal/gtest/googletest/include", + "-Iexternal/gtest/googletest", + ], + linkopts = [ + "-L/usr/local/lib", + "-lpqxx", + "-lpq", + "-lhiredis_cluster", + "-lhiredis", + "-lmongocxx", + "-lbsoncxx", + ], + linkstatic = 1, + deps = [ + "//db_adapter:db_adapter_lib", + "//tests/cpp/test_commons", + "@com_github_google_googletest//:gtest_main", + "@mbedtls", + ], +) diff --git a/src/tests/cpp/db_adapter_test.cc b/src/tests/cpp/db_adapter_test.cc new file mode 100644 index 000000000..3fb316265 --- /dev/null +++ b/src/tests/cpp/db_adapter_test.cc @@ -0,0 +1,887 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "Atom.h" +#include "ContextLoader.h" +#include "DataTypes.h" +#include "DedicatedThread.h" +#include "Logger.h" +#include "Node.h" +#include "Pipeline.h" +#include "PostgresWrapper.h" +#include "Processor.h" +#include "TestConfig.h" + +using namespace std; +using namespace db_adapter; +using namespace atoms; +using namespace processor; + +class PostgresWrapperTestEnvironment : public ::testing::Environment { + public: + void SetUp() override { + TestConfig::load_environment(); + AtomDBSingleton::init(); + } + + void TearDown() override {} +}; + +class PostgresDatabaseConnectionTest : public ::testing::Test { + protected: + string TEST_HOST = "localhost"; + int TEST_PORT = 5433; + string TEST_DB = "postgres_wrapper_test"; + string TEST_USER = "postgres"; + string TEST_PASSWORD = "test"; + + string INVALID_HOST = "invalid.host"; + int INVALID_PORT = 99999; + string INVALID_DB = "database_xyz"; + + string FEATURE_TABLE = "public.feature"; + string ORGANISM_TABLE = "public.organism"; + string CVTERM_TABLE = "public.cvterm"; + string FEATURE_PK = "feature_id"; + string ORGANISM_PK = "organism_id"; + string CVTERM_PK = "cvterm_id"; + + int DROSOPHILA_ORGANISM_ID = 1; + int HUMAN_ORGANISM_ID = 2; + + int WHITE_GENE_ID = 1; + string WHITE_GENE_NAME = "white"; + string WHITE_GENE_UNIQUENAME = "FBgn0003996"; + + int TOTAL_ROWS_ORGANISMS = 5; + int TOTAL_ROWS_CVTERMS = 10; + int TOTAL_ROWS_FEATURES = 26; + + void SetUp() override {} + + void TearDown() override { + if (this->conn) { + this->conn->stop(); + } + } + + shared_ptr create_db_connection() { + this->conn = make_shared( + "test-conn", TEST_HOST, TEST_PORT, TEST_DB, TEST_USER, TEST_PASSWORD); + this->conn->setup(); + this->conn->start(); + return this->conn; + } + + shared_ptr conn; +}; + +class PostgresWrapperTest : public ::testing::Test { + protected: + string TEST_HOST = "localhost"; + int TEST_PORT = 5433; + string TEST_DB = "postgres_wrapper_test"; + string TEST_USER = "postgres"; + string TEST_PASSWORD = "test"; + + string INVALID_HOST = "invalid.host"; + int INVALID_PORT = 99999; + string INVALID_DB = "database_xyz"; + + string FEATURE_TABLE = "public.feature"; + string ORGANISM_TABLE = "public.organism"; + string CVTERM_TABLE = "public.cvterm"; + string FEATURE_PK = "feature_id"; + string ORGANISM_PK = "organism_id"; + string CVTERM_PK = "cvterm_id"; + + int DROSOPHILA_ORGANISM_ID = 1; + int HUMAN_ORGANISM_ID = 2; + + int WHITE_GENE_ID = 1; + string WHITE_GENE_NAME = "white"; + string WHITE_GENE_UNIQUENAME = "FBgn0003996"; + + int TOTAL_ROWS_ORGANISMS = 5; + int TOTAL_ROWS_CVTERMS = 10; + int TOTAL_ROWS_FEATURES = 26; + + void SetUp() override { + temp_file_path_1 = "/tmp/context_1.json"; + temp_file_path_2 = "/tmp/context_2.json"; + + ofstream file_1(temp_file_path_1); + file_1 << R"([ + { + "table_name": "public.organism", + "skip_columns": [], + "where_clauses": ["organism_id = 1"] + }, + { + "table_name": "public.feature", + "skip_columns": [], + "where_clauses": ["feature_id = 1"] + }, + { + "table_name": "public.cvterm", + "skip_columns": [], + "where_clauses": ["cvterm_id = 1"] + }])"; + file_1.close(); + + ofstream file_2(temp_file_path_2); + file_2 << R"([ + { + "table_name": "public.organism", + "skip_columns": [2, "genus"], + "where_clauses": ["organism_id = 1"] + }, + { + "table_name": "feature", + "skip_columns": [], + "where_clauses": ["feature_id = 1"] + }, + { + "table_name": "public.cvterm", + "skip_columns": [], + "where_clauses": ["cvterm_id = 1"] + }])"; + file_2.close(); + } + + void TearDown() override { + std::remove(temp_file_path_1.c_str()); + std::remove(temp_file_path_2.c_str()); + if (this->conn) { + this->conn->stop(); + } + } + + shared_ptr create_wrapper(PostgresDatabaseConnection& db_conn, + MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS) { + auto queue = make_shared(); + return make_shared(db_conn, mapper_type, queue); + } + + string temp_file_path_1; + string temp_file_path_2; + + shared_ptr create_db_connection() { + this->conn = make_shared( + "test-conn", TEST_HOST, TEST_PORT, TEST_DB, TEST_USER, TEST_PASSWORD); + this->conn->setup(); + this->conn->start(); + return this->conn; + } + + shared_ptr conn; +}; + +TEST_F(PostgresDatabaseConnectionTest, Connection) { + auto conn = create_db_connection(); + + EXPECT_TRUE(conn->is_connected()); + + auto result = conn->execute_query("SELECT 1"); + + EXPECT_FALSE(result.empty()); + EXPECT_EQ(result[0][0].as(), 1); + + conn->stop(); + + EXPECT_FALSE(conn->is_connected()); + + auto conn1 = new PostgresDatabaseConnection( + "test-conn1", INVALID_HOST, TEST_PORT, TEST_DB, TEST_USER, TEST_PASSWORD); + EXPECT_THROW(conn1->connect(), std::runtime_error); + + auto conn2 = new PostgresDatabaseConnection( + "test-conn2", TEST_HOST, INVALID_PORT, TEST_DB, TEST_USER, TEST_PASSWORD); + EXPECT_THROW(conn2->connect(), std::runtime_error); + + auto conn3 = new PostgresDatabaseConnection( + "test-conn3", TEST_HOST, TEST_PORT, INVALID_DB, TEST_USER, TEST_PASSWORD); + EXPECT_THROW(conn3->connect(), std::runtime_error); +} + +TEST_F(PostgresDatabaseConnectionTest, ConcurrentConnection) { + const int num_threads = 100; + vector threads; + atomic count_threads{0}; + + auto worker = [&](int thread_id) { + try { + string thread_id_str = "conn-" + to_string(thread_id); + auto conn = new PostgresDatabaseConnection( + thread_id_str, TEST_HOST, TEST_PORT, TEST_DB, TEST_USER, TEST_PASSWORD); + + EXPECT_FALSE(conn->is_connected()); + + conn->setup(); + + EXPECT_FALSE(conn->is_connected()); + + conn->start(); + + EXPECT_TRUE(conn->is_connected()); + + conn->execute_query("SELECT 1"); + + count_threads++; + + conn->stop(); + + EXPECT_FALSE(conn->is_connected()); + } catch (const exception& e) { + cout << "Thread " << thread_id << " failed with error: " << e.what() << endl; + } + }; + + for (int i = 0; i < num_threads; ++i) threads.emplace_back(worker, i); + + for (auto& t : threads) t.join(); + + EXPECT_EQ(count_threads, num_threads); +} + +TEST_F(PostgresDatabaseConnectionTest, CheckData) { + auto conn = create_db_connection(); + + auto result = conn->execute_query( + "SELECT organism_id, genus, species, common_name FROM organism WHERE organism_id = 1"); + + ASSERT_EQ(result.size(), 1); + EXPECT_EQ(result[0]["organism_id"].as(), 1); + EXPECT_EQ(result[0]["genus"].as(), "Drosophila"); + EXPECT_EQ(result[0]["species"].as(), "melanogaster"); + EXPECT_EQ(result[0]["common_name"].as(), "fruit fly"); + + auto result2 = + conn->execute_query("SELECT feature_id, name, uniquename FROM feature WHERE feature_id = " + + to_string(WHITE_GENE_ID)); + + ASSERT_EQ(result2.size(), 1); + EXPECT_EQ(result2[0]["feature_id"].as(), WHITE_GENE_ID); + EXPECT_EQ(result2[0]["name"].as(), WHITE_GENE_NAME); + EXPECT_EQ(result2[0]["uniquename"].as(), WHITE_GENE_UNIQUENAME); + + auto result3 = conn->execute_query("SELECT COUNT(*) FROM organism"); + + EXPECT_EQ(result3[0][0].as(), TOTAL_ROWS_ORGANISMS); + + auto result4 = conn->execute_query("SELECT COUNT(*) FROM cvterm"); + + EXPECT_EQ(result4[0][0].as(), TOTAL_ROWS_CVTERMS); + + auto result5 = conn->execute_query("SELECT COUNT(*) FROM feature"); + + EXPECT_EQ(result5[0][0].as(), TOTAL_ROWS_FEATURES); +} + +TEST_F(PostgresWrapperTest, GetTable) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + auto tables = wrapper->list_tables(); + ASSERT_FALSE(tables.empty()); + + string target_name = tables[0].name; + Table t = wrapper->get_table(target_name); + + EXPECT_EQ(t.name, target_name); + EXPECT_EQ(t.primary_key, tables[0].primary_key); + + Table feature = wrapper->get_table("public.feature"); + + EXPECT_EQ(feature.name, "public.feature"); + EXPECT_EQ(feature.primary_key, "feature_id"); + + EXPECT_THROW(wrapper->get_table("fake_table_name"), std::runtime_error); +} + +TEST_F(PostgresWrapperTest, ListTables) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + auto tables = wrapper->list_tables(); + + EXPECT_GE(tables.size(), 3); + + bool found_organism = false; + bool found_cvterm = false; + bool found_feature = false; + + for (const auto& table : tables) { + if (table.name == ORGANISM_TABLE) found_organism = true; + if (table.name == CVTERM_TABLE) found_cvterm = true; + if (table.name == FEATURE_TABLE) found_feature = true; + } + + EXPECT_TRUE(found_organism); + EXPECT_TRUE(found_cvterm); + EXPECT_TRUE(found_feature); + + vector
tables_cached = wrapper->list_tables(); + + ASSERT_EQ(tables_cached.size(), tables.size()); + if (!tables_cached.empty()) { + EXPECT_EQ(tables_cached[0].name, tables[0].name); + } +} + +TEST_F(PostgresWrapperTest, TablesStructure) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table organism_table = wrapper->get_table(ORGANISM_TABLE); + + EXPECT_EQ(organism_table.name, ORGANISM_TABLE); + EXPECT_EQ(organism_table.primary_key, ORGANISM_PK); + EXPECT_TRUE(organism_table.foreign_keys.empty()); + + vector expected_cols = { + "organism_id", "genus", "species", "common_name", "abbreviation", "comment", "created_at"}; + for (const auto& expected : expected_cols) { + bool found = + find(organism_table.column_names.begin(), organism_table.column_names.end(), expected) != + organism_table.column_names.end(); + EXPECT_TRUE(found); + } + + Table cvterm_table = wrapper->get_table(CVTERM_TABLE); + + EXPECT_EQ(cvterm_table.name, CVTERM_TABLE); + EXPECT_EQ(cvterm_table.primary_key, CVTERM_PK); + EXPECT_TRUE(cvterm_table.foreign_keys.empty()); + + Table feature_table = wrapper->get_table(FEATURE_TABLE); + + EXPECT_EQ(feature_table.name, FEATURE_TABLE); + EXPECT_EQ(feature_table.primary_key, FEATURE_PK); + EXPECT_EQ(feature_table.foreign_keys.size(), 2); + + bool has_organism_fk = false; + bool has_type_fk = false; + for (const auto& fk : feature_table.foreign_keys) { + if (fk.find("organism_id") != string::npos && fk.find("public.organism") != string::npos) { + has_organism_fk = true; + } + if (fk.find("type_id") != string::npos && fk.find("public.cvterm") != string::npos) { + has_type_fk = true; + } + } + EXPECT_TRUE(has_organism_fk); + EXPECT_TRUE(has_type_fk); +} + +// map_table - SQL2ATOMS +TEST_F(PostgresWrapperTest, MapTablesFirstRowAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table organism_table = wrapper->get_table(ORGANISM_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(organism_table, {"organism_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 34); + + Table feature_table = wrapper->get_table(FEATURE_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(feature_table, {"feature_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 81); + + Table cvterm_table = wrapper->get_table(CVTERM_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(cvterm_table, {"cvterm_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 101); +} + +TEST_F(PostgresWrapperTest, MapTableWithClausesAndSkipColumnsAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table table = wrapper->get_table(FEATURE_TABLE); + vector clauses = {"organism_id = " + to_string(DROSOPHILA_ORGANISM_ID), "feature_id <= 5"}; + vector skip_columns = {"residues", "md5checksum", "seqlen"}; + + EXPECT_NO_THROW({ wrapper->map_table(table, clauses, skip_columns, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 114); +} + +TEST_F(PostgresWrapperTest, MapTableZeroRowsAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table table = wrapper->get_table(FEATURE_TABLE); + vector clauses = {"feature_id = -999"}; + + EXPECT_NO_THROW({ wrapper->map_table(table, clauses, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapTableWithNonExistentSkipColumnAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table table = wrapper->get_table(FEATURE_TABLE); + + vector clauses = {"feature_id < 10"}; + vector skip_columns = {"column_xyz"}; + + EXPECT_THROW({ wrapper->map_table(table, clauses, skip_columns, false); }, std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapTableWithInvalidClauseAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + Table table = wrapper->get_table(FEATURE_TABLE); + + vector clauses = {"INVALID CLAUSE SYNTAX !!!"}; + + EXPECT_THROW({ wrapper->map_table(table, clauses, {}, false); }, std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +// map_table - SQL2METTA +TEST_F(PostgresWrapperTest, MapTablesFirstRowMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + Table organism_table = wrapper->get_table(ORGANISM_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(organism_table, {"organism_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 19); + + Table feature_table = wrapper->get_table(FEATURE_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(feature_table, {"feature_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 51); + + Table cvterm_table = wrapper->get_table(CVTERM_TABLE); + EXPECT_NO_THROW({ wrapper->map_table(cvterm_table, {"cvterm_id = 1"}, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 65); +} + +TEST_F(PostgresWrapperTest, MapTableWithClausesAndSkipColumnsMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + Table table = wrapper->get_table(FEATURE_TABLE); + vector clauses = {"organism_id = " + to_string(DROSOPHILA_ORGANISM_ID), "feature_id <= 5"}; + vector skip_columns = {"residues", "md5checksum", "seqlen"}; + + EXPECT_NO_THROW({ wrapper->map_table(table, clauses, skip_columns, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 86); +} + +TEST_F(PostgresWrapperTest, MapTableZeroRowsMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + Table table = wrapper->get_table(FEATURE_TABLE); + vector clauses = {"feature_id = -999"}; + + EXPECT_NO_THROW({ wrapper->map_table(table, clauses, {}, false); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapTableWithNonExistentSkipColumnMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + Table table = wrapper->get_table(FEATURE_TABLE); + + vector clauses = {"feature_id < 10"}; + vector skip_columns = {"column_xyz"}; + + EXPECT_THROW({ wrapper->map_table(table, clauses, skip_columns, false); }, std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapTableWithInvalidClauseMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + Table table = wrapper->get_table(FEATURE_TABLE); + + vector clauses = {"INVALID CLAUSE SYNTAX !!!"}; + + EXPECT_THROW({ wrapper->map_table(table, clauses, {}, false); }, std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +// map_sql_query - SQL2ATOMS +TEST_F(PostgresWrapperTest, MapSqlQueryFirstRowAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + string query_organism = R"( + SELECT + o.organism_id AS public_organism__organism_id, + o.genus AS public_organism__genus, + o.species AS public_organism__species, + o.common_name AS public_organism__common_name, + o.abbreviation AS public_organism__abbreviation, + o.comment AS public_organism__comment + FROM organism AS o + WHERE o.organism_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_organism", query_organism); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 34); + + string query_feature = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.feature_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature", query_feature); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 81); + + string query_cvterm = R"( + SELECT + c.cvterm_id AS public_cvterm__cvterm_id, + c.name AS public_cvterm__name, + c.definition AS public_cvterm__definition, + c.is_obsolete AS public_cvterm__is_obsolete, + c.is_relationshiptype AS public_cvterm__is_relationshiptype + FROM cvterm AS c + WHERE c.cvterm_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_cvterm", query_cvterm); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 101); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithClausesAndSkipColumnsAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.organism_id = )" + + to_string(DROSOPHILA_ORGANISM_ID) + R"( AND f.feature_id <= 5)"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature_clause_and_skip", query); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 114); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryZeroRowsAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.feature_id = -999 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature_zero_rows", query); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithNonExistentSkipColumnAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.column_xyz AS public_feature__column_xyz + FROM feature AS f + WHERE f.feature_id < 10 + )"; + + EXPECT_THROW({ wrapper->map_sql_query("test_feature_with_non_existent_column", query); }, + std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithInvalidClauseAtoms) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE INVALID CLAUSE SYNTAX !!! + )"; + + EXPECT_THROW({ wrapper->map_sql_query("test_feature_with_invalid_clause", query); }, + std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +// map_sql_query - SQL2METTA +TEST_F(PostgresWrapperTest, MapSqlQueryFirstRowMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + string query_organism = R"( + SELECT + o.organism_id AS public_organism__organism_id, + o.genus AS public_organism__genus, + o.species AS public_organism__species, + o.common_name AS public_organism__common_name, + o.abbreviation AS public_organism__abbreviation, + o.comment AS public_organism__comment + FROM organism AS o + WHERE o.organism_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_organism", query_organism); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 19); + + string query_feature = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.feature_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature", query_feature); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 51); + + string query_cvterm = R"( + SELECT + c.cvterm_id AS public_cvterm__cvterm_id, + c.name AS public_cvterm__name, + c.definition AS public_cvterm__definition, + c.is_obsolete AS public_cvterm__is_obsolete, + c.is_relationshiptype AS public_cvterm__is_relationshiptype + FROM cvterm AS c + WHERE c.cvterm_id = 1 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_cvterm", query_cvterm); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 65); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithClausesAndSkipColumnsMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.organism_id = )" + + to_string(DROSOPHILA_ORGANISM_ID) + R"( AND f.feature_id <= 5)"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature_clause_and_skip", query); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 86); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryZeroRowsMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE f.feature_id = -999 + )"; + + EXPECT_NO_THROW({ wrapper->map_sql_query("test_feature_zero_rows", query); }); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithNonExistentSkipColumnMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.column_xyz AS public_feature__column_xyz + FROM feature AS f + WHERE f.feature_id < 10 + )"; + + EXPECT_THROW({ wrapper->map_sql_query("test_feature_with_non_existent_column", query); }, + std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapSqlQueryWithInvalidClauseMetta) { + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn, MAPPER_TYPE::SQL2METTA); + + string query = R"( + SELECT + f.feature_id AS public_feature__feature_id, + f.organism_id AS public_feature__organism_id, + f.type_id AS public_feature__type_id, + f.name AS public_feature__name, + f.uniquename AS public_feature__uniquename, + f.residues AS public_feature__residues, + f.md5checksum AS public_feature__md5checksum, + f.seqlen AS public_feature__seqlen, + f.is_analysis AS public_feature__is_analysis, + f.is_obsolete AS public_feature__is_obsolete + FROM feature AS f + WHERE INVALID CLAUSE SYNTAX !!! + )"; + + EXPECT_THROW({ wrapper->map_sql_query("test_feature_with_invalid_clause", query); }, + std::runtime_error); + EXPECT_EQ(wrapper->mapper_handle_trie_size(), 0); +} + +TEST_F(PostgresWrapperTest, MapTablesFirstRowAtomsWithContextFile) { + vector tables_mapping = ContextLoader::load_context_file("/tmp/context_1.json"); + + EXPECT_FALSE(tables_mapping.empty()); + + auto conn = create_db_connection(); + auto wrapper = create_wrapper(*conn); + + vector atoms_sizes; + + for (const auto& tm : tables_mapping) { + string table_name = tm.table_name; + vector skip_columns = tm.skip_columns.value_or(vector{}); + vector where_clauses = tm.where_clauses.value_or(vector{}); + + Table table = wrapper->get_table(table_name); + EXPECT_NO_THROW({ wrapper->map_table(table, where_clauses, skip_columns, false); }); + atoms_sizes.push_back(wrapper->mapper_handle_trie_size()); + } + EXPECT_EQ(atoms_sizes.size(), 3); + EXPECT_EQ(atoms_sizes[0], 34); + EXPECT_EQ(atoms_sizes[1], 81); + EXPECT_EQ(atoms_sizes[2], 101); + + vector tables_mapping_2 = ContextLoader::load_context_file("/tmp/context_2.json"); + + EXPECT_TRUE(tables_mapping_2.empty()); +} + +// TEST_F(PostgresWrapperTest, PipelineProcessor) { +// string query_organism = R"( +// SELECT +// o.organism_id AS public_organism__organism_id, +// o.genus AS public_organism__genus, +// o.species AS public_organism__species, +// o.common_name AS public_organism__common_name, +// o.abbreviation AS public_organism__abbreviation, +// o.comment AS public_organism__comment +// FROM organism AS o +// WHERE o.organism_id = 1 +// )"; + +// auto queue = make_shared(); + +// DatabaseMappingJob db_job( +// TEST_HOST, TEST_PORT, TEST_DB, TEST_USER, TEST_PASSWORD, MAPPER_TYPE::SQL2ATOMS, queue); +// db_job.add_task_query("Test1", query_organism); +// auto producer = make_shared("producer", &db_job); + +// EXPECT_EQ(queue->size(), 0); +// producer->setup(); +// EXPECT_EQ(queue->size(), 0); +// producer->start(); +// EXPECT_EQ(queue->size(), 0); + +// while (!db_job.is_finished()) { +// Utils::sleep(); +// } +// producer->stop(); + +// EXPECT_EQ(queue->size(), 34); + +// AtomPersistenceJob atomdb_job(queue); +// auto consumer = make_shared("consumer", &atomdb_job); + +// EXPECT_EQ(queue->size(), 34); +// consumer->setup(); +// EXPECT_EQ(queue->size(), 34); +// consumer->start(); +// EXPECT_EQ(queue->size(), 34); + +// while (!atomdb_job.is_finished()) { +// Utils::sleep(); +// } +// consumer->stop(); + +// EXPECT_EQ(queue->size(), 0); +// } + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new PostgresWrapperTestEnvironment); + return RUN_ALL_TESTS(); +} diff --git a/src/tests/main/BUILD b/src/tests/main/BUILD index add05743f..b049cf58e 100644 --- a/src/tests/main/BUILD +++ b/src/tests/main/BUILD @@ -70,3 +70,12 @@ cc_library( "//service_bus:service_bus_lib", ], ) + +cc_library( + name = "database_adapter_main_lib", + srcs = ["database_adapter_main.cc"], + deps = [ + "//atomdb:atomdb_singleton", + "//db_adapter:db_adapter_lib", + ], +) diff --git a/src/tests/main/database_adapter_main.cc b/src/tests/main/database_adapter_main.cc new file mode 100644 index 000000000..2470f7e3b --- /dev/null +++ b/src/tests/main/database_adapter_main.cc @@ -0,0 +1,158 @@ +#include + +#include +#include + +#include "AtomDBSingleton.h" +#include "ContextLoader.h" +#include "DataTypes.h" +#include "DedicatedThread.h" +#include "Pipeline.h" +#include "Utils.h" + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace std; +using namespace commons; +using namespace atomdb; +using namespace db_adapter; +using namespace processor; + +void ctrl_c_handler(int) { + std::cout << "Stopping database adapter..." << std::endl; + std::cout << "Done." << std::endl; + exit(0); +} + +void usage(const char* a) { + cerr << "Usage: " << a + << " [password] " + "[context_file_path] [query_SQL] [mapper_type] \n\n" + "Rules:\n" + "- You must provide at least context_file_path OR query_SQL.\n" + "- Use '.' to skip an optional argument.\n\n" + "Examples:\n" + " ./bin host port db user pwd ./context.json\n" + " ./bin host port db user pwd . \"SELECT * FROM table\"\n" + " ./bin host port db user pwd ./context.json . ATOMS\n" + " ./bin host port db user pwd . \"SELECT * FROM table\" ATOMS\n" + " ./bin host port db user pwd ./context.json \"SELECT * FROM table\" METTA\n"; + exit(1); +} + +bool is_empty_arg(const string& s) { return s.empty() || s == "."; } + +void run(string host, + int port, + string database, + string username, + string password, + vector tables_mapping, + vector queries_SQL, + MAPPER_TYPE mapper_type) { + LOG_DEBUG("Starting database adapter with the following parameters:"); + + auto queue = make_shared(); + + DatabaseMappingJob db_mapping_job(host, port, database, username, password, mapper_type, queue); + auto producer = make_shared("producer", &db_mapping_job); + if (!tables_mapping.empty()) { + for (const auto& table_mapping : tables_mapping) { + db_mapping_job.add_task_table(table_mapping); + } + } + LOG_DEBUG("Loaded " + to_string(tables_mapping.size()) + " table mappings from context file."); + if (!queries_SQL.empty()) { + for (size_t i = 0; i < queries_SQL.size(); i++) { + db_mapping_job.add_task_query("custom_query_" + to_string(i), queries_SQL[i]); + } + } + LOG_DEBUG("Loaded " + to_string(queries_SQL.size()) + " queries from query file."); + + AtomPersistenceJob atomdb_job(queue); + auto consumer = make_shared("consumer", &atomdb_job); + + consumer->setup(); + consumer->start(); + LOG_DEBUG("Consumer thread started."); + + producer->setup(); + producer->start(); + LOG_DEBUG("Producer thread started."); + + while (!db_mapping_job.is_finished()) { + Utils::sleep(); + } + + LOG_INFO("Mapping completed. Loading data into DAS."); + producer->stop(); + + atomdb_job.set_producer_finished(); + + while (!atomdb_job.is_finished()) { + LOG_DEBUG("Waiting for AtomPersistenceJob to finish..."); + Utils::sleep(); + } + + LOG_INFO("Loading completed!"); + consumer->stop(); +} + +int main(int argc, char* argv[]) { + if (argc < 7 || argc > 9) { + usage(argv[0]); + } + + signal(SIGINT, &ctrl_c_handler); + signal(SIGTERM, &ctrl_c_handler); + + string host = argv[1]; + int port = stoi(argv[2]); + string database = argv[3]; + string username = argv[4]; + string password = argv[5]; + + string context_file_path = argv[6]; + string query_file_path = (argc >= 8) ? argv[7] : "."; + string mapper_type_str = (argc >= 9) ? argv[8] : "."; + + if (is_empty_arg(context_file_path) && is_empty_arg(query_file_path)) { + cerr << "Error: You must provide at least context_file_path OR query_file_path.\n\n"; + usage(argv[0]); + } + + bool has_context = !is_empty_arg(context_file_path); + bool has_query = !is_empty_arg(query_file_path); + bool has_mapper = !is_empty_arg(mapper_type_str); + + vector tables_mapping; + if (has_context) { + tables_mapping = ContextLoader::load_context_file(context_file_path); + } + + vector queries_SQL; + if (has_query) { + LOG_DEBUG("Loading queries from file: " + query_file_path); + queries_SQL = ContextLoader::load_query_file(query_file_path); + } + + MAPPER_TYPE mapper_type = MAPPER_TYPE::SQL2ATOMS; + + if (has_mapper) { + if (mapper_type_str == "ATOMS") { + mapper_type = MAPPER_TYPE::SQL2ATOMS; + } else if (mapper_type_str == "METTA") { + mapper_type = MAPPER_TYPE::SQL2METTA; + } else { + cerr << "Error: Invalid mapper_type. Use 'ATOMS' or 'METTA'.\n\n"; + usage(argv[0]); + } + } + + AtomDBSingleton::init(); + + run(host, port, database, username, password, tables_mapping, queries_SQL, mapper_type); + + return 0; +} \ No newline at end of file