From f8e182a259f8af4549aa708a0cb709dcf01d6e13 Mon Sep 17 00:00:00 2001 From: GyulyVGC Date: Wed, 17 Sep 2025 15:53:10 +0200 Subject: [PATCH 1/2] use datastore request builders from libdatastore --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/db/datastore_wrapper.rs | 832 ++++++++++++++---------------------- 3 files changed, 327 insertions(+), 511 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c27cdf8..be950b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1523,9 +1523,9 @@ dependencies = [ [[package]] name = "nullnet-libdatastore" -version = "0.4.9" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd053d151ab5ed00156ab93f3f833b5ab20e6e6a9a1d135a0efc62c05c5339a2" +checksum = "7568fdbc6447444a532a15d0b903e8636f4aa79c5a74d0c7f95296942cbe3288" dependencies = [ "log", "nullnet-liberror", diff --git a/Cargo.toml b/Cargo.toml index 9550bb7..95ee9c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ lto = true strip = true [dependencies] -nullnet-libdatastore = "0.4.8" +nullnet-libdatastore = "0.5.0" #nullnet-liblogging = "0.3.0" nullnet-liberror = "0.1.1" nullnet-libipinfo = "0.2.0" diff --git a/src/db/datastore_wrapper.rs b/src/db/datastore_wrapper.rs index 321d449..43eb135 100644 --- a/src/db/datastore_wrapper.rs +++ b/src/db/datastore_wrapper.rs @@ -8,12 +8,12 @@ use crate::proto::appguard::{AppGuardIpInfo, Log}; use chrono::Utc; use ipnetwork::IpNetwork; use nullnet_libdatastore::{ - AdvanceFilter, BatchCreateBody, BatchCreateRequest, BatchDeleteBody, BatchDeleteRequest, - BatchUpdateBody, BatchUpdateRequest, CreateBody, CreateParams, CreateRequest, DeleteQuery, - DeleteRequest, EntityFieldFrom, EntityFieldTo, FieldRelation, GetByFilterBody, - GetByFilterRequest, GetByIdRequest, Join, LoginBody, LoginData, LoginParams, LoginRequest, - MultipleSort, Params, Query, RegisterDeviceParams, RegisterDeviceRequest, Response, - ResponseData, UpdateRequest, UpsertBody, UpsertRequest, + AdvanceFilter, AdvanceFilterBuilder, BatchCreateRequestBuilder, BatchDeleteBody, + BatchDeleteRequest, BatchUpdateRequestBuilder, CreateRequestBuilder, DeleteRequestBuilder, + EntityFieldFrom, EntityFieldTo, FieldRelation, GetByFilterRequestBuilder, + GetByIdRequestBuilder, Join, LoginRequestBuilder, MultipleSort, Params, Query, + RegisterDeviceRequestBuilder, Response, ResponseData, UpdateRequestBuilder, UpsertBody, + UpsertRequest, }; use nullnet_libdatastore::{DatastoreClient, DatastoreConfig}; use nullnet_liberror::{location, Error, ErrorHandler, Location}; @@ -44,16 +44,12 @@ impl DatastoreWrapper { let record = entry.to_json()?; let table = entry.table().to_str(); - let request = CreateRequest { - params: Some(CreateParams { - table: table.into(), - }), - query: Some(Query { - pluck: String::from("id"), - durability: String::from("soft"), - }), - body: Some(CreateBody { record }), - }; + let request = CreateRequestBuilder::new() + .table(table) + .durability("soft") + .pluck(["id"]) + .record(record) + .build(); log::trace!("Before create to {table}"); let result = self.inner.create(request, token).await?; @@ -95,6 +91,7 @@ impl DatastoreWrapper { let record = entry.to_json()?; let table = entry.table().to_str(); + // TODO: create builder for upsert requests let request = UpsertRequest { params: Some(Params { id: String::new(), @@ -162,32 +159,20 @@ impl DatastoreWrapper { ) -> Result, Error> { let table = DbTable::IpInfo.to_str(); - let request = GetByFilterRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::new(), - }), - body: Some(GetByFilterBody { - pluck: vec!["*".to_string()], - advance_filters: vec![AdvanceFilter { - r#type: "criteria".to_string(), - field: "ip".to_string(), - operator: "equal".to_string(), - entity: table.to_string(), - values: format!("[\"{ip}\"]"), - }], - order_by: String::new(), - limit: 1, - offset: 0, - order_direction: String::new(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::default(), - date_format: String::new(), - is_case_sensitive_sorting: false, - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("ip") + .operator("equal") + .entity(table) + .values(format!("[\"{ip}\"]")) + .build(); + + let request = GetByFilterRequestBuilder::new() + .table(table) + .pluck("*") + .limit(1) + .advance_filter(filter) + .build(); log::trace!("Before get by filter to {table}"); let result_json = self @@ -209,31 +194,22 @@ impl DatastoreWrapper { token: &str, ) -> Result, Error> { let table = table.to_str(); - let request = GetByFilterRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from("root"), - }), - body: Some(GetByFilterBody { - pluck: vec!["timestamp".to_string()], - advance_filters: vec![], - order_by: String::new(), - limit: 1, - offset: 0, - order_direction: String::new(), - joins: vec![], - multiple_sort: vec![MultipleSort { - by_field: format!("{table}.timestamp"), - by_direction: "asc".to_string(), - is_case_sensitive_sorting: false, - }], - pluck_object: HashMap::default(), - date_format: String::new(), - is_case_sensitive_sorting: false, - }), + + // TODO: create builder for MultipleSort + let sort = MultipleSort { + by_field: format!("{table}.timestamp"), + by_direction: "asc".to_string(), + is_case_sensitive_sorting: false, }; + let request = GetByFilterRequestBuilder::new() + .table(table) + .performed_by_root(true) + .pluck("timestamp") + .limit(1) + .multiple_sort(sort) + .build(); + log::trace!("Before get oldest timestamp to {table}"); let result_json = self.inner.get_by_filter(request, token).await?.data; let result_vec: Option> = serde_json::from_str(&result_json).ok(); @@ -250,6 +226,8 @@ impl DatastoreWrapper { token: &str, ) -> Result { let table = table.to_str(); + + // TODO: builder for BatchDeleteRequest let request = BatchDeleteRequest { params: Some(Params { id: String::new(), @@ -279,34 +257,21 @@ impl DatastoreWrapper { ) -> Result, Error> { let table = DbTable::Firewall.to_str(); - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("active"), - operator: String::from("equal"), - entity: table.to_string(), - values: "[true]".to_string(), - }; - - let request = GetByFilterRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from("root"), - }), - body: Some(GetByFilterBody { - pluck: vec!["app_id".to_string(), "firewall".to_string()], - advance_filters: vec![filter], - order_by: String::new(), - limit: i32::MAX, - offset: 0, - order_direction: String::new(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::default(), - date_format: String::new(), - is_case_sensitive_sorting: false, - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("active") + .operator("equal") + .entity(table) + .values("[true]") + .build(); + + let request = GetByFilterRequestBuilder::new() + .table(table) + .performed_by_root(true) + .plucks(["app_id", "firewall"]) + .advance_filter(filter) + .limit(i32::MAX) + .build(); log::trace!("Before get by filter to {table}"); let result = self.inner.get_by_filter(request, &token).await?.data; @@ -352,39 +317,26 @@ impl DatastoreWrapper { pub(crate) async fn get_configs(&mut self, token: String) -> Result { let table = DbTable::Config.to_str(); - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("active"), - operator: String::from("equal"), - entity: table.to_string(), - values: "[true]".to_string(), - }; - - let request = GetByFilterRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from("root"), - }), - body: Some(GetByFilterBody { - pluck: vec![ - "log_request".to_string(), - "log_response".to_string(), - "retention_sec".to_string(), - "ip_info_cache_size".to_string(), - ], - advance_filters: vec![filter], - order_by: String::new(), - limit: 1, - offset: 0, - order_direction: String::new(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::default(), - date_format: String::new(), - is_case_sensitive_sorting: false, - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("active") + .operator("equal") + .entity(table) + .values("[true]") + .build(); + + let request = GetByFilterRequestBuilder::new() + .table(table) + .performed_by_root(true) + .plucks(vec![ + "log_request", + "log_response", + "retention_sec", + "ip_info_cache_size", + ]) + .advance_filter(filter) + .limit(1) + .build(); log::trace!("Before get by filter to {table}"); let result = self.inner.get_by_filter(request, &token).await?.data; @@ -440,22 +392,11 @@ impl DatastoreWrapper { account_secret: String, is_root: bool, ) -> Result { - let request = LoginRequest { - params: Some(LoginParams { - is_root: if is_root { - String::from("true") - } else { - String::from("false") - }, - t: String::new(), - }), - body: Some(LoginBody { - data: Some(LoginData { - account_id: account_id.to_owned(), - account_secret: account_secret.to_owned(), - }), - }), - }; + let request = LoginRequestBuilder::new() + .set_root(is_root) + .account_id(account_id) + .account_secret(account_secret) + .build(); log::trace!("Before login"); let response = self.inner.clone().login(request).await?; @@ -513,25 +454,21 @@ impl DatastoreWrapper { device_id: String, device_address: String, ) -> Result { - let request = UpdateRequest { - params: Some(Params { - table: String::from("devices"), - id: device_id, - r#type: String::new(), - }), - query: Some(Query { - pluck: String::from("id,code"), - durability: String::from("soft"), - }), - body: json!({ - "device_version": "", - "system_id": "", - "ip_address": device_address, - "is_connection_established": true, - "status": "Active" - }) - .to_string(), - }; + let request = UpdateRequestBuilder::new() + .id(device_id) + .table("devices") + .query("id,code", "soft") + .body( + json!({ + "device_version": "", + "system_id": "", + "ip_address": device_address, + "is_connection_established": true, + "status": "Active" + }) + .to_string(), + ) + .build(); log::trace!("Before device setup"); let response = self.inner.clone().update(request, token).await?; @@ -555,16 +492,12 @@ impl DatastoreWrapper { async fn logs_insert_single(&mut self, log: Log, token: &str) -> Result { let record = serde_json::to_string(&log).handle_err(location!())?; - let request = CreateRequest { - params: Some(CreateParams { - table: String::from("appguard_logs"), - }), - query: Some(Query { - pluck: String::from("id"), - durability: String::from("soft"), - }), - body: Some(CreateBody { record }), - }; + let request = CreateRequestBuilder::new() + .table("appguard_logs") + .pluck(["id"]) + .durability("soft") + .record(record) + .build(); println!("Before single log insert"); let res = self.inner.create(request, token).await?; @@ -580,16 +513,11 @@ impl DatastoreWrapper { ) -> Result { let records = serde_json::to_string(&logs).handle_err(location!())?; - let request = BatchCreateRequest { - params: Some(CreateParams { - table: String::from("appguard_logs"), - }), - query: Some(Query { - pluck: String::new(), - durability: String::from("soft"), - }), - body: Some(BatchCreateBody { records }), - }; + let request = BatchCreateRequestBuilder::new() + .table("appguard_logs") + .durability("soft") + .records(records) + .build(); println!("Before batch log insert"); let res = self.inner.batch_create(request, token).await?; @@ -604,40 +532,26 @@ impl DatastoreWrapper { device_id: &str, performed_by_root: bool, ) -> Result, Error> { - let r#type = if performed_by_root { - String::from("root") - } else { - String::new() - }; - - let request = GetByIdRequest { - params: Some(Params { - id: String::from(device_id), - table: String::from("devices"), - r#type, - }), - query: Some(Query { - pluck: vec![ - "id", - "device_uuid", - "is_traffic_monitoring_enabled", - "is_config_monitoring_enabled", - "is_telemetry_monitoring_enabled", - "is_device_authorized", - "device_category", - "device_type", - "device_os", - "device_name", - "is_device_online", - "organization_id", - ] - .into_iter() - .map(Into::::into) - .collect::>() - .join(","), - durability: String::from("soft"), - }), - }; + let request = GetByIdRequestBuilder::new() + .id(device_id) + .table("devices") + .performed_by_root(performed_by_root) + .pluck(vec![ + "id", + "device_uuid", + "is_traffic_monitoring_enabled", + "is_config_monitoring_enabled", + "is_telemetry_monitoring_enabled", + "is_device_authorized", + "device_category", + "device_type", + "device_os", + "device_name", + "is_device_online", + "organization_id", + ]) + .durability("soft") + .build(); let response = self.inner.clone().get_by_id(request, token).await?; if response.count == 0 { @@ -663,20 +577,17 @@ impl DatastoreWrapper { account_secret: &str, device: &Device, ) -> Result { - let request = RegisterDeviceRequest { - device: Some(RegisterDeviceParams { - organization_id: device.organization.clone(), - account_id: String::from(account_id), - account_secret: String::from(account_secret), - is_new_user: true, - is_invited: false, - role_id: String::new(), - account_organization_status: "Active".to_string(), - account_organization_categories: vec![String::from("Device")], - device_categories: vec![String::from("Device")], - device_id: device.id.clone(), - }), - }; + let request = RegisterDeviceRequestBuilder::new() + .organization_id(&device.organization) + .account_id(account_id) + .account_secret(account_secret) + .set_is_new_user(true) + .set_is_invited(false) + .account_organization_status("Active") + .add_account_organization_category("Device") + .add_device_category("Device") + .device_id(&device.id) + .build(); let response = self.inner.clone().register_device(request, token).await?; @@ -689,18 +600,12 @@ impl DatastoreWrapper { device_id: &str, device: &Device, ) -> Result { - let request = UpdateRequest { - params: Some(Params { - id: String::from(device_id), - table: String::from("devices"), - r#type: String::new(), - }), - query: Some(Query { - pluck: String::new(), - durability: String::from("soft"), - }), - body: json!(device).to_string(), - }; + let request = UpdateRequestBuilder::new() + .id(device_id) + .table("devices") + .query("", "soft") + .body(json!(device).to_string()) + .build(); let data = self.inner.clone().update(request, token).await?; @@ -718,25 +623,19 @@ impl DatastoreWrapper { }) .to_string(); - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("device_uuid"), - operator: String::from("equal"), - entity: String::from("devices"), - values: format!("[\"{device_uuid}\"]"), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("device_uuid") + .operator("equal") + .entity("devices") + .values(format!("[\"{device_uuid}\"]")) + .build(); - let request = BatchUpdateRequest { - params: Some(Params { - id: String::new(), - table: String::from("devices"), - r#type: String::new(), - }), - body: Some(BatchUpdateBody { - advance_filters: vec![filter], - updates, - }), - }; + let request = BatchUpdateRequestBuilder::new() + .table("devices") + .updates(updates) + .advance_filter(filter) + .build(); let _ = self.inner.clone().batch_update(request, token).await; @@ -745,38 +644,27 @@ impl DatastoreWrapper { pub async fn create_device(&self, token: &str, device: &Device) -> Result { let mut json = json!(device); - json.as_object_mut().unwrap().remove("id"); - let request = CreateRequest { - params: Some(CreateParams { - table: String::from("devices"), - }), - query: Some(Query { - pluck: vec![ - "id", - "device_uuid", - "is_traffic_monitoring_enabled", - "is_config_monitoring_enabled", - "is_telemetry_monitoring_enabled", - "is_device_authorized", - "device_category", - "device_type", - "device_os", - "device_name", - "is_device_online", - "organization_id", - ] - .into_iter() - .map(Into::::into) - .collect::>() - .join(","), - durability: String::from("soft"), - }), - body: Some(CreateBody { - record: json.to_string(), - }), - }; + let request = CreateRequestBuilder::new() + .table("devices") + .pluck(vec![ + "id", + "device_uuid", + "is_traffic_monitoring_enabled", + "is_config_monitoring_enabled", + "is_telemetry_monitoring_enabled", + "is_device_authorized", + "device_category", + "device_type", + "device_os", + "device_name", + "is_device_online", + "organization_id", + ]) + .durability("soft") + .record(json.to_string()) + .build(); let response = self.inner.clone().create(request, token).await?; @@ -804,18 +692,11 @@ impl DatastoreWrapper { .handle_err(location!())? .remove("id"); - let request = CreateRequest { - params: Some(CreateParams { - table: "device_instances".into(), - }), - query: Some(Query { - pluck: "id,device_id".into(), - durability: String::new(), - }), - body: Some(CreateBody { - record: json.to_string(), - }), - }; + let request = CreateRequestBuilder::new() + .table("device_instances") + .pluck(vec!["id", "device_id"]) + .record(json.to_string()) + .build(); let response = self.inner.clone().create(request, token).await?; @@ -837,16 +718,10 @@ impl DatastoreWrapper { token: &str, instance_id: &str, ) -> Result<(), Error> { - let request = DeleteRequest { - params: Some(Params { - id: instance_id.into(), - table: "device_instances".into(), - r#type: String::new(), - }), - query: Some(DeleteQuery { - is_permanent: String::new(), - }), - }; + let request = DeleteRequestBuilder::new() + .id(instance_id) + .table("device_instances") + .build(); let _ = self.inner.clone().delete(request, token).await?; @@ -858,47 +733,37 @@ impl DatastoreWrapper { token: &str, device_uuid: &str, ) -> Result, Error> { - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("device_uuid"), - operator: String::from("equal"), - entity: String::from("devices"), - values: format!("[\"{device_uuid}\"]"), - }; - - let request = GetByFilterRequest { - body: Some(GetByFilterBody { - pluck: vec![ - "id".into(), - "device_uuid".into(), - "is_traffic_monitoring_enabled".into(), - "is_config_monitoring_enabled".into(), - "is_telemetry_monitoring_enabled".into(), - "is_device_authorized".into(), - "device_category".into(), - "device_type".into(), - "device_os".into(), - "device_name".into(), - "is_device_online".into(), - "organization_id".into(), - ], - advance_filters: vec![filter], - order_by: "timestamp".to_string(), - limit: 1, - offset: 0, - order_direction: "desc".to_string(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::new(), - date_format: String::new(), - is_case_sensitive_sorting: true, - }), - params: Some(Params { - id: String::new(), - table: "devices".to_string(), - r#type: String::from("root"), - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("device_uuid") + .operator("equal") + .entity("devices") + .values(format!("[\"{device_uuid}\"]")) + .build(); + + let request = GetByFilterRequestBuilder::new() + .plucks(vec![ + "id", + "device_uuid", + "is_traffic_monitoring_enabled", + "is_config_monitoring_enabled", + "is_telemetry_monitoring_enabled", + "is_device_authorized", + "device_category", + "device_type", + "device_os", + "device_name", + "is_device_online", + "organization_id", + ]) + .advance_filter(filter) + .order_by("timestamp") + .limit(1) + .order_direction("desc") + .case_sensitive_sorting(true) + .table("devices") + .performed_by_root(true) + .build(); let response = self.inner.clone().get_by_filter(request, token).await?; @@ -923,40 +788,30 @@ impl DatastoreWrapper { code: &str, token: &str, ) -> Result, Error> { - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("code"), - operator: String::from("equal"), - entity: String::from("installation_codes"), - values: format!("[\"{code}\"]"), - }; - - let request = GetByFilterRequest { - body: Some(GetByFilterBody { - pluck: vec![ - "id".into(), - "redeemed".into(), - "device_id".into(), - "device_code".into(), - "organization_id".into(), - ], - advance_filters: vec![filter], - order_by: "timestamp".to_string(), - limit: 1, - offset: 0, - order_direction: "desc".to_string(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::new(), - date_format: String::new(), - is_case_sensitive_sorting: true, - }), - params: Some(Params { - id: String::new(), - table: "installation_codes".to_string(), - r#type: String::from("root"), - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("code") + .operator("equal") + .entity("installation_codes") + .values(format!("[\"{code}\"]")) + .build(); + + let request = GetByFilterRequestBuilder::new() + .plucks(vec![ + "id", + "redeemed", + "device_id", + "device_code", + "organization_id", + ]) + .advance_filter(filter) + .order_by("timestamp") + .limit(1) + .order_direction("desc") + .case_sensitive_sorting(true) + .table("installation_codes") + .performed_by_root(true) + .build(); let response = self.inner.clone().get_by_filter(request, token).await?; @@ -982,18 +837,13 @@ impl DatastoreWrapper { code: &InstallationCode, token: &str, ) -> Result<(), Error> { - let request = UpdateRequest { - params: Some(Params { - table: String::from("installation_codes"), - id: code.id.clone(), - r#type: String::from("root"), - }), - query: Some(Query { - pluck: String::from("id,code"), - durability: String::from("soft"), - }), - body: json!({"redeemed": true}).to_string(), - }; + let request = UpdateRequestBuilder::new() + .table("installation_codes") + .id(&code.id) + .performed_by_root(true) + .query("id,code", "soft") + .body(json!({"redeemed": true}).to_string()) + .build(); let _ = self.inner.clone().update(request, token).await; @@ -1003,27 +853,22 @@ impl DatastoreWrapper { pub(crate) async fn deactivate_old_configs(&mut self, token: &str) -> Result { let table = DbTable::Config.to_str(); - let filter = AdvanceFilter { - r#type: "criteria".to_string(), - field: "active".to_string(), - operator: "equal".to_string(), - entity: table.to_string(), - values: "[true]".to_string(), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("active") + .operator("equal") + .entity(table) + .values("[true]") + .build(); let updates = json!({"active": false}).to_string(); - let request = BatchUpdateRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from("root"), - }), - body: Some(BatchUpdateBody { - advance_filters: vec![filter], - updates, - }), - }; + let request = BatchUpdateRequestBuilder::new() + .table(table) + .performed_by_root(true) + .advance_filter(filter) + .updates(updates) + .build(); log::trace!("Before batch update to {table}"); let count = self.inner.batch_update(request, token).await?.count; @@ -1038,27 +883,21 @@ impl DatastoreWrapper { ) -> Result { let table = DbTable::Firewall.to_str(); - let filter = AdvanceFilter { - r#type: "criteria".to_string(), - field: "app_id".to_string(), - operator: "equal".to_string(), - entity: table.to_string(), - values: format!("[\"{device_id}\"]"), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("app_id") + .operator("equal") + .entity(table) + .values(format!("[\"{device_id}\"]")) + .build(); let updates = json!({"active": false}).to_string(); - let request = BatchUpdateRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from(""), - }), - body: Some(BatchUpdateBody { - advance_filters: vec![filter], - updates, - }), - }; + let request = BatchUpdateRequestBuilder::new() + .table(table) + .advance_filter(filter) + .updates(updates) + .build(); log::trace!("Before batch update to {table}"); let count = self.inner.batch_update(request, token).await?.count; @@ -1074,14 +913,15 @@ impl DatastoreWrapper { let table_aliases = DbTable::Alias.to_str(); let table_ip_aliases = DbTable::IpAlias.to_str(); - let filter = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("name"), - operator: String::from("equal"), - entity: table_aliases.to_string(), - values: format!("[\"{name}\"]"), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("name") + .operator("equal") + .entity(table_aliases) + .values(format!("[\"{name}\"]")) + .build(); + // TODO: builder for Join let join = Join { r#type: "left".to_string(), field_relation: Some(FieldRelation { @@ -1100,32 +940,20 @@ impl DatastoreWrapper { }), }; - let request = GetByFilterRequest { - params: Some(Params { - id: String::new(), - table: table_aliases.into(), - r#type: String::from("root"), - }), - body: Some(GetByFilterBody { - pluck: vec![], - advance_filters: vec![filter], - order_by: String::new(), - limit: i32::MAX, - offset: 0, - order_direction: String::new(), - joins: vec![join], - multiple_sort: vec![], - pluck_object: HashMap::from([ - ( - table_ip_aliases.to_string(), - String::from("[\"ip\", \"prefix\"]"), - ), - (table_aliases.to_string(), String::from("[\"id\"]")), - ]), - date_format: String::new(), - is_case_sensitive_sorting: false, - }), - }; + let request = GetByFilterRequestBuilder::new() + .table(table_aliases) + .performed_by_root(true) + .advance_filter(filter) + .limit(i32::MAX) + .join(join) + .pluck_objects(HashMap::from([ + ( + table_ip_aliases.to_string(), + String::from("[\"ip\", \"prefix\"]"), + ), + (table_aliases.to_string(), String::from("[\"id\"]")), + ])) + .build(); log::trace!("Before get by filter to {table_aliases} and {table_ip_aliases}"); let result = self.inner.get_by_filter(request, &token).await?.data; @@ -1185,6 +1013,7 @@ impl DatastoreWrapper { .to_string(); let table = "aliases"; + // TODO: Upsert builder let request = UpsertRequest { params: Some(Params { id: String::new(), @@ -1242,55 +1071,42 @@ impl DatastoreWrapper { period: usize, ) -> Result, Error> { let table = DbTable::HttpRequest.to_str(); - let filter_1 = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("ip"), - operator: String::from("equal"), - entity: String::from(table), - values: format!("[\"{ip}\"]"), - }; - let filter_2 = AdvanceFilter { - r#type: String::from("operator"), - field: String::new(), - operator: String::from("and"), - entity: String::new(), - values: String::new(), - }; + let filter_1 = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("ip") + .operator("equal") + .entity(table) + .values(format!("[\"{ip}\"]")) + .build(); + + let filter_2 = AdvanceFilterBuilder::new() + .r#type("operator") + .operator("and") + .build(); let timestamp = Utc::now() .sub(Duration::from_secs( u64::try_from(period).handle_err(location!())?, )) .to_rfc3339(); - let filter_3 = AdvanceFilter { - r#type: String::from("criteria"), - field: String::from("timestamp"), - operator: String::from("greater_than_or_equal"), - entity: String::from(table), - values: format!("[\"{timestamp}\"]"), - }; - let request = GetByFilterRequest { - body: Some(GetByFilterBody { - pluck: vec!["id".into(), "original_url".into()], - advance_filters: vec![filter_1, filter_2, filter_3], - order_by: String::new(), - limit: i32::MAX, - offset: 0, - order_direction: String::new(), - joins: vec![], - multiple_sort: vec![], - pluck_object: HashMap::new(), - date_format: String::new(), - is_case_sensitive_sorting: true, - }), - params: Some(Params { - id: String::new(), - table: table.to_string(), - r#type: String::from("root"), - }), - }; + let filter_3 = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("timestamp") + .operator("greater_than_or_equal") + .entity(table) + .values(format!("[\"{timestamp}\"]")) + .build(); + + let request = GetByFilterRequestBuilder::new() + .plucks(vec!["id", "original_url"]) + .advance_filters(vec![filter_1, filter_2, filter_3]) + .limit(i32::MAX) + .case_sensitive_sorting(true) + .table(table) + .performed_by_root(true) + .build(); let result = self.inner.clone().get_by_filter(request, token).await?.data; From b4eeef75036151bde872e0e64c39555439022a2b Mon Sep 17 00:00:00 2001 From: GyulyVGC Date: Thu, 18 Sep 2025 17:51:50 +0200 Subject: [PATCH 2/2] update appguard to libdatastore 0.5.1 --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/db/datastore_wrapper.rs | 123 +++++++++++++----------------------- 3 files changed, 46 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be950b6..816620f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1523,9 +1523,9 @@ dependencies = [ [[package]] name = "nullnet-libdatastore" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7568fdbc6447444a532a15d0b903e8636f4aa79c5a74d0c7f95296942cbe3288" +checksum = "900c9bdef95a8cafd0b3822c8fa12593791738425af05218ad276a7dbae644d2" dependencies = [ "log", "nullnet-liberror", diff --git a/Cargo.toml b/Cargo.toml index 95ee9c7..6f91b82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ lto = true strip = true [dependencies] -nullnet-libdatastore = "0.5.0" +nullnet-libdatastore = "0.5.1" #nullnet-liblogging = "0.3.0" nullnet-liberror = "0.1.1" nullnet-libipinfo = "0.2.0" diff --git a/src/db/datastore_wrapper.rs b/src/db/datastore_wrapper.rs index 43eb135..1eb8449 100644 --- a/src/db/datastore_wrapper.rs +++ b/src/db/datastore_wrapper.rs @@ -8,12 +8,11 @@ use crate::proto::appguard::{AppGuardIpInfo, Log}; use chrono::Utc; use ipnetwork::IpNetwork; use nullnet_libdatastore::{ - AdvanceFilter, AdvanceFilterBuilder, BatchCreateRequestBuilder, BatchDeleteBody, - BatchDeleteRequest, BatchUpdateRequestBuilder, CreateRequestBuilder, DeleteRequestBuilder, - EntityFieldFrom, EntityFieldTo, FieldRelation, GetByFilterRequestBuilder, - GetByIdRequestBuilder, Join, LoginRequestBuilder, MultipleSort, Params, Query, - RegisterDeviceRequestBuilder, Response, ResponseData, UpdateRequestBuilder, UpsertBody, - UpsertRequest, + AdvanceFilterBuilder, BatchCreateRequestBuilder, BatchDeleteRequestBuilder, + BatchUpdateRequestBuilder, CreateRequestBuilder, DeleteRequestBuilder, + GetByFilterRequestBuilder, GetByIdRequestBuilder, JoinBuilder, LoginRequestBuilder, + MultipleSortBuilder, RegisterDeviceRequestBuilder, ResponseData, UpdateRequestBuilder, + UpsertRequestBuilder, }; use nullnet_libdatastore::{DatastoreClient, DatastoreConfig}; use nullnet_liberror::{location, Error, ErrorHandler, Location}; @@ -91,22 +90,12 @@ impl DatastoreWrapper { let record = entry.to_json()?; let table = entry.table().to_str(); - // TODO: create builder for upsert requests - let request = UpsertRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::new(), - }), - query: Some(Query { - pluck: String::from("id"), - durability: String::from("soft"), - }), - body: Some(UpsertBody { - data: record, - conflict_columns, - }), - }; + let request = UpsertRequestBuilder::new() + .table(table) + .query("id", "soft") + .data(record) + .conflict_columns(conflict_columns) + .build(); log::trace!("Before upsert to {table}"); let result = self.inner.upsert(request, token).await?; @@ -195,12 +184,10 @@ impl DatastoreWrapper { ) -> Result, Error> { let table = table.to_str(); - // TODO: create builder for MultipleSort - let sort = MultipleSort { - by_field: format!("{table}.timestamp"), - by_direction: "asc".to_string(), - is_case_sensitive_sorting: false, - }; + let sort = MultipleSortBuilder::new() + .by_field(format!("{table}.timestamp")) + .by_direction("asc") + .build(); let request = GetByFilterRequestBuilder::new() .table(table) @@ -227,23 +214,19 @@ impl DatastoreWrapper { ) -> Result { let table = table.to_str(); - // TODO: builder for BatchDeleteRequest - let request = BatchDeleteRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::from("root"), - }), - body: Some(BatchDeleteBody { - advance_filters: vec![AdvanceFilter { - r#type: "criteria".to_string(), - field: "timestamp".to_string(), - operator: "less_than_or_equal".to_string(), - entity: table.to_string(), - values: format!("[\"{timestamp}\"]"), - }], - }), - }; + let filter = AdvanceFilterBuilder::new() + .r#type("criteria") + .field("timestamp") + .operator("less_than_or_equal") + .entity(table) + .values(format!("[\"{timestamp}\"]")) + .build(); + + let request = BatchDeleteRequestBuilder::new() + .table(table) + .performed_by_root(true) + .advance_filter(filter) + .build(); log::trace!("Before delete old entries to {table}"); let count = self.inner.batch_delete(request, token).await?.count; @@ -576,7 +559,7 @@ impl DatastoreWrapper { account_id: &str, account_secret: &str, device: &Device, - ) -> Result { + ) -> Result { let request = RegisterDeviceRequestBuilder::new() .organization_id(&device.organization) .account_id(account_id) @@ -921,24 +904,14 @@ impl DatastoreWrapper { .values(format!("[\"{name}\"]")) .build(); - // TODO: builder for Join - let join = Join { - r#type: "left".to_string(), - field_relation: Some(FieldRelation { - to: Some(EntityFieldTo { - entity: table_ip_aliases.to_string(), - field: String::from("alias_id"), - alias: String::from(""), - limit: i32::MAX, - order_by: String::new(), - filters: Vec::new(), - }), - from: Some(EntityFieldFrom { - entity: table_aliases.to_string(), - field: String::from("id"), - }), - }), - }; + let join = JoinBuilder::new() + .r#type("left") + .to_entity(table_ip_aliases) + .to_field("alias_id") + .to_limit(i32::MAX) + .from_entity(table_aliases) + .from_field("id") + .build(); let request = GetByFilterRequestBuilder::new() .table(table_aliases) @@ -1013,22 +986,12 @@ impl DatastoreWrapper { .to_string(); let table = "aliases"; - // TODO: Upsert builder - let request = UpsertRequest { - params: Some(Params { - id: String::new(), - table: table.into(), - r#type: String::new(), - }), - query: Some(Query { - pluck: String::from("id"), - durability: String::from("soft"), - }), - body: Some(UpsertBody { - data: record, - conflict_columns: vec!["name".to_string()], - }), - }; + let request = UpsertRequestBuilder::new() + .table(table) + .query("id", "soft") + .data(record) + .conflict_column("name") + .build(); log::trace!("Before upsert to {table}"); let result = self.inner.upsert(request, token).await?;