diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 6fb787af4..6eab244bc 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -160,7 +160,13 @@ pub async fn sync_streams_with_ingestors( pub async fn sync_users_with_roles_with_ingestors( username: &str, role: &HashSet, + operation: &str, ) -> Result<(), RBACError> { + match operation { + "add" | "remove" => {} + _ => return Err(RBACError::InvalidSyncOperation(operation.to_string())), + } + let role_data = to_vec(&role.clone()).map_err(|err| { error!("Fatal: failed to serialize role: {:?}", err); RBACError::SerdeError(err) @@ -168,12 +174,15 @@ pub async fn sync_users_with_roles_with_ingestors( let username = username.to_owned(); + let op = operation.to_string(); + for_each_live_ingestor(move |ingestor| { let url = format!( - "{}{}/user/{}/role/sync", + "{}{}/user/{}/role/sync/{}", ingestor.domain_name, base_path_without_preceding_slash(), - username + username, + op ); let role_data = role_data.clone(); diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 4c1e92f39..acb1ec645 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -59,7 +59,7 @@ pub struct CommonAttributes { pub trait RouteExt { fn authorize(self, action: Action) -> Self; - fn authorize_for_stream(self, action: Action) -> Self; + fn authorize_for_resource(self, action: Action) -> Self; fn authorize_for_user(self, action: Action) -> Self; } @@ -71,10 +71,10 @@ impl RouteExt for Route { }) } - fn authorize_for_stream(self, action: Action) -> Self { + fn authorize_for_resource(self, action: Action) -> Self { self.wrap(Auth { action, - method: auth_stream_context, + method: auth_resource_context, }) } @@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result Result { let creds = extract_session_key(req); + let usergroup = req.match_info().get("usergroup"); + let llmid = req.match_info().get("llmid"); let mut stream = req.match_info().get("logstream"); - if stream.is_none() { + if let Some(usergroup) = usergroup { + creds.map(|key| Users.authorize(key, action, Some(usergroup), None)) + } else if let Some(llmid) = llmid { + creds.map(|key| Users.authorize(key, action, Some(llmid), None)) + } else if let Some(stream) = stream { + creds.map(|key| Users.authorize(key, action, Some(stream), None)) + } else { if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) { stream = Some(stream_name.to_str().unwrap()); } + creds.map(|key| Users.authorize(key, action, stream, None)) } - creds.map(|key| Users.authorize(key, action, stream, None)) } pub fn auth_user_context( diff --git a/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs index f25abe688..ee50c0fce 100644 --- a/src/handlers/http/modal/ingest/ingestor_rbac.rs +++ b/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -24,6 +24,7 @@ use tokio::sync::Mutex; use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, rbac::{ + map::roles, user::{self, User as ParseableUser}, Users, }, @@ -48,7 +49,7 @@ pub async fn post_user( let _ = storage::put_staging_metadata(&metadata); let created_role = user.roles.clone(); Users.put_user(user.clone()); - Users.put_role(&username, created_role.clone()); + Users.add_roles(&username, created_role.clone()); } Ok(generated_password) @@ -73,18 +74,85 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// Handler PATCH /user/{username}/role/sync/add => Add roles to a user +pub async fn add_roles_to_user( username: web::Path, - role: web::Json>, + roles_to_add: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_add = roles_to_add.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + // check if all roles exist + let mut non_existent_roles = Vec::new(); + roles_to_add.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existent_roles.push(r.clone()); + } + }); + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.extend(roles_to_add.clone()); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + let _ = storage::put_staging_metadata(&metadata); + // update in mem table + Users.add_roles(&username.clone(), roles_to_add.clone()); + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler PATCH /user/{username}/role/sync/add => Add roles to a user +pub async fn remove_roles_from_user( + username: web::Path, + roles_to_remove: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + + // check if all roles exist + let mut non_existent_roles = Vec::new(); + roles_to_remove.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existent_roles.push(r.clone()); + } + }); + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + + // check that user actually has these roles + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -92,7 +160,9 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -100,7 +170,7 @@ pub async fn put_role( let _ = storage::put_staging_metadata(&metadata); // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.remove_roles(&username.clone(), roles_to_remove.clone()); Ok(format!("Roles updated successfully for {username}")) } diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index d48b9efdf..b2656c979 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ web::{self, Json}, HttpResponse, Responder, @@ -23,7 +25,10 @@ use actix_web::{ use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, - rbac::{map::mut_roles, role::model::DefaultPrivilege}, + rbac::{ + map::{mut_roles, mut_sessions, read_user_groups, users}, + role::model::DefaultPrivilege, + }, storage, }; @@ -40,5 +45,25 @@ pub async fn put( let _ = storage::put_staging_metadata(&metadata); mut_roles().insert(name.clone(), privileges); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index c0564449c..1ecc5dff8 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -198,11 +198,21 @@ impl IngestServer { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role/sync") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role/sync/add") + // PATCH /user/{username}/role/sync/add => Add roles to a user .route( - web::put() - .to(ingestor_rbac::put_role) + web::patch() + .to(ingestor_rbac::add_roles_to_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ), + ) + .service( + web::resource("/{username}/role/sync/remove") + // PATCH /user/{username}/role/sync/remove => Remove roles from a user + .route( + web::patch() + .to(ingestor_rbac::remove_roles_from_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), @@ -227,7 +237,7 @@ impl IngestServer { .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .wrap(from_fn( resource_check::check_resource_utilization_middleware, @@ -245,7 +255,7 @@ impl IngestServer { .route( web::put() .to(ingestor_logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ), ) .service( @@ -253,7 +263,7 @@ impl IngestServer { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -261,7 +271,7 @@ impl IngestServer { web::resource("/stats").route( web::get() .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -269,7 +279,7 @@ impl IngestServer { web::resource("/cleanup").route( web::post() .to(ingestor_logstream::retention_cleanup) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ), ), ), diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index ae2af1c2d..8b0f81551 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -30,11 +30,14 @@ use crate::{ modal::utils::rbac_utils::{get_metadata, put_metadata}, rbac::RBACError, }, - rbac::{user, Users}, + rbac::{ + map::{roles, write_user_groups}, + user, Users, + }, validator, }; -// async aware lock for updating storage metadata and user map atomicically +// async aware lock for updating storage metadata and user map atomically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); // Handler for POST /api/v1/user/{username} @@ -48,14 +51,27 @@ pub async fn post_user( let mut metadata = get_metadata().await?; validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { + let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? } else { return Err(RBACError::RoleValidationError); }; - if roles.is_empty() { + if user_roles.is_empty() { return Err(RBACError::RoleValidationError); + } else { + let mut non_existant_roles = Vec::new(); + user_roles + .iter() + .map(|r| { + if !roles().contains_key(r) { + non_existant_roles.push(r.clone()); + } + }) + .for_each(drop); + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } } let _ = UPDATE_LOCK.lock().await; if Users.contains(&username) @@ -72,12 +88,12 @@ pub async fn post_user( metadata.users.push(user.clone()); put_metadata(&metadata).await?; - let created_role = roles.clone(); + let created_role = user_roles.clone(); Users.put_user(user.clone()); - sync_user_creation_with_ingestors(user, &Some(roles)).await?; + sync_user_creation_with_ingestors(user, &Some(user_roles)).await?; - put_role( + add_roles_to_user( web::Path::::from(username.clone()), web::Json(created_role), ) @@ -98,6 +114,24 @@ pub async fn delete_user(username: web::Path) -> Result) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// Handler PATCH /user/{username}/role/add => Add roles to a user +pub async fn add_roles_to_user( username: web::Path, - role: web::Json>, + roles_to_add: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_add = roles_to_add.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_add.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.extend(roles_to_add.clone()); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.add_roles(&username.clone(), roles_to_add.clone()); + + sync_users_with_roles_with_ingestors(&username, &roles_to_add, "add").await?; + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler PATCH /user/{username}/role/remove => Remove roles from a user +pub async fn remove_roles_from_user( + username: web::Path, + roles_to_remove: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + + let mut non_existant_roles = Vec::new(); + + // check if the role exists + roles_to_remove.iter().for_each(|r| { + if roles().get(r).is_none() { + non_existant_roles.push(r.clone()); + } + }); + + if !non_existant_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existant_roles)); + } + + // check for role not present with user + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -126,7 +230,9 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -134,9 +240,9 @@ pub async fn put_role( put_metadata(&metadata).await?; // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.remove_roles(&username.clone(), roles_to_remove.clone()); - sync_users_with_roles_with_ingestors(&username, &role).await?; + sync_users_with_roles_with_ingestors(&username, &roles_to_remove, "remove").await?; Ok(format!("Roles updated successfully for {username}")) } diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index b8b6f4639..40a7024b4 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ web::{self, Json}, HttpResponse, Responder, @@ -27,7 +29,10 @@ use crate::{ modal::utils::rbac_utils::{get_metadata, put_metadata}, role::RoleError, }, - rbac::{map::mut_roles, role::model::DefaultPrivilege}, + rbac::{ + map::{mut_roles, mut_sessions, read_user_groups, users}, + role::model::DefaultPrivilege, + }, }; // Handler for PUT /api/v1/role/{name} @@ -43,6 +48,26 @@ pub async fn put( put_metadata(&metadata).await?; mut_roles().insert(name.clone(), privileges.clone()); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; Ok(HttpResponse::Ok().finish()) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 595c52833..335e29896 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -94,9 +94,8 @@ impl ParseableServer for QueryServer { )); } - let parseable_json = PARSEABLE.validate_storage().await?; - migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; - + let mut parseable_json = PARSEABLE.validate_storage().await?; + migration::run_metadata_migration(&PARSEABLE, &mut parseable_json).await?; Ok(parseable_json) } @@ -209,18 +208,30 @@ impl QueryServer { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role").route( + web::get() + .to(rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/role/add") + // PATCH /user/{username}/role/add => Add roles to a user .route( - web::put() - .to(querier_rbac::put_role) + web::patch() + .to(rbac::add_roles_to_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), - ) + ), + ) + .service( + web::resource("/{username}/role/remove") + // PATCH /user/{username}/role/remove => Remove roles from a user .route( - web::get() - .to(rbac::get_role) - .authorize_for_user(Action::GetUserRoles), + web::patch() + .to(rbac::remove_roles_from_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), ), ) .service( @@ -262,19 +273,19 @@ impl QueryServer { .route( web::put() .to(querier_logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(querier_ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( web::delete() .to(querier_logstream::delete) - .authorize_for_stream(Action::DeleteStream), + .authorize_for_resource(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -283,7 +294,7 @@ impl QueryServer { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -291,7 +302,7 @@ impl QueryServer { web::resource("/schema").route( web::get() .to(logstream::get_schema) - .authorize_for_stream(Action::GetSchema), + .authorize_for_resource(Action::GetSchema), ), ) .service( @@ -299,7 +310,7 @@ impl QueryServer { web::resource("/stats").route( web::get() .to(querier_logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -308,13 +319,13 @@ impl QueryServer { .route( web::put() .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route( web::get() .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetRetention), ), ) .service( @@ -323,17 +334,17 @@ impl QueryServer { .route( web::put() .to(logstream::put_stream_hot_tier) - .authorize_for_stream(Action::PutHotTierEnabled), + .authorize_for_resource(Action::PutHotTierEnabled), ) .route( web::get() .to(logstream::get_stream_hot_tier) - .authorize_for_stream(Action::GetHotTierEnabled), + .authorize_for_resource(Action::GetHotTierEnabled), ) .route( web::delete() .to(logstream::delete_stream_hot_tier) - .authorize_for_stream(Action::DeleteHotTierEnabled), + .authorize_for_resource(Action::DeleteHotTierEnabled), ), ), ) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 575f2925d..615af1d8e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -114,8 +114,8 @@ impl ParseableServer for Server { async fn load_metadata(&self) -> anyhow::Result> { //TODO: removed file migration //deprecated support for deployments < v1.0.0 - let parseable_json = PARSEABLE.validate_storage().await?; - migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?; + let mut parseable_json = PARSEABLE.validate_storage().await?; + migration::run_metadata_migration(&PARSEABLE, &mut parseable_json).await?; Ok(parseable_json) } @@ -182,9 +182,9 @@ impl Server { web::resource("/info").route( web::get() .to(http::prism_logstream::get_info) - .authorize_for_stream(Action::GetStreamInfo) - .authorize_for_stream(Action::GetStats) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), ), ), ) @@ -195,9 +195,9 @@ impl Server { "", web::post() .to(http::prism_logstream::post_datasets) - .authorize_for_stream(Action::GetStreamInfo) - .authorize_for_stream(Action::GetStats) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetStreamInfo) + .authorize_for_resource(Action::GetStats) + .authorize_for_resource(Action::GetRetention), ) } @@ -408,13 +408,13 @@ impl Server { .route( web::put() .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_resource(Action::CreateStream), ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest) + .authorize_for_resource(Action::Ingest) .wrap(from_fn( resource_check::check_resource_utilization_middleware, )), @@ -423,7 +423,7 @@ impl Server { .route( web::delete() .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), + .authorize_for_resource(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -432,7 +432,7 @@ impl Server { web::resource("/info").route( web::get() .to(logstream::get_stream_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_resource(Action::GetStreamInfo), ), ) .service( @@ -440,7 +440,7 @@ impl Server { web::resource("/schema").route( web::get() .to(logstream::get_schema) - .authorize_for_stream(Action::GetSchema), + .authorize_for_resource(Action::GetSchema), ), ) .service( @@ -448,7 +448,7 @@ impl Server { web::resource("/stats").route( web::get() .to(logstream::get_stats) - .authorize_for_stream(Action::GetStats), + .authorize_for_resource(Action::GetStats), ), ) .service( @@ -457,13 +457,13 @@ impl Server { .route( web::put() .to(logstream::put_retention) - .authorize_for_stream(Action::PutRetention), + .authorize_for_resource(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route( web::get() .to(logstream::get_retention) - .authorize_for_stream(Action::GetRetention), + .authorize_for_resource(Action::GetRetention), ), ) .service( @@ -472,17 +472,17 @@ impl Server { .route( web::put() .to(logstream::put_stream_hot_tier) - .authorize_for_stream(Action::PutHotTierEnabled), + .authorize_for_resource(Action::PutHotTierEnabled), ) .route( web::get() .to(logstream::get_stream_hot_tier) - .authorize_for_stream(Action::GetHotTierEnabled), + .authorize_for_resource(Action::GetHotTierEnabled), ) .route( web::delete() .to(logstream::delete_stream_hot_tier) - .authorize_for_stream(Action::DeleteHotTierEnabled), + .authorize_for_resource(Action::DeleteHotTierEnabled), ), ), ) @@ -494,7 +494,7 @@ impl Server { .route( web::post() .to(ingest::ingest) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) } @@ -507,7 +507,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_logs_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -516,7 +516,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_metrics_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -525,7 +525,7 @@ impl Server { .route( web::post() .to(ingest::handle_otel_traces_ingestion) - .authorize_for_stream(Action::Ingest), + .authorize_for_resource(Action::Ingest), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), ) @@ -622,18 +622,30 @@ impl Server { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role") - // PUT /user/{username}/roles => Put roles for user + web::resource("/{username}/role").route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/role/add") + // PATCH /user/{username}/role/add => Add roles to a user .route( - web::put() - .to(http::rbac::put_role) + web::patch() + .to(http::rbac::add_roles_to_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), - ) + ), + ) + .service( + web::resource("/{username}/role/remove") + // PATCH /user/{username}/role/remove => Remove roles from a user .route( - web::get() - .to(http::rbac::get_role) - .authorize_for_user(Action::GetUserRoles), + web::patch() + .to(http::rbac::remove_roles_from_user) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), ), ) .service( diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 13e65a4e1..54baec3ac 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -344,7 +344,7 @@ async fn update_user_if_changed( group: HashSet, user_info: user::UserInfo, ) -> Result { - let User { ty, roles } = &mut user; + let User { ty, roles, .. } = &mut user; let UserType::OAuth(oauth_user) = ty else { unreachable!() }; diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 3870c88a9..acaabf3a6 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -19,7 +19,14 @@ use std::collections::{HashMap, HashSet}; use crate::{ - rbac::{self, map::roles, role::model::DefaultPrivilege, user, utils::to_prism_user, Users}, + rbac::{ + self, + map::{read_user_groups, roles}, + role::model::DefaultPrivilege, + user, + utils::to_prism_user, + Users, + }, storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; @@ -30,6 +37,8 @@ use actix_web::{ }; use http::StatusCode; use itertools::Itertools; +use serde::Serialize; +use serde_json::json; use tokio::sync::Mutex; use super::modal::utils::rbac_utils::{get_metadata, put_metadata}; @@ -97,14 +106,24 @@ pub async fn post_user( let mut metadata = get_metadata().await?; validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { + let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? } else { return Err(RBACError::RoleValidationError); }; - if roles.is_empty() { + if user_roles.is_empty() { return Err(RBACError::RoleValidationError); + } else { + let mut non_existent_roles = Vec::new(); + for role in &user_roles { + if !roles().contains_key(role) { + non_existent_roles.push(role.clone()); + } + } + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } } let _ = UPDATE_LOCK.lock().await; if Users.contains(&username) @@ -121,10 +140,10 @@ pub async fn post_user( metadata.users.push(user.clone()); put_metadata(&metadata).await?; - let created_role = roles.clone(); + let created_role = user_roles.clone(); Users.put_user(user.clone()); - put_role( + add_roles_to_user( web::Path::::from(username.clone()), web::Json(created_role), ) @@ -170,7 +189,7 @@ pub async fn get_role(username: web::Path) -> Result> = Users + let direct_roles: HashMap> = Users .get_role(&username) .iter() .filter_map(|role_name| { @@ -180,21 +199,48 @@ pub async fn get_role(username: web::Path) -> Result>> = HashMap::new(); + // user might be part of some user groups, fetch the roles from there as well + for user_group in Users.get_user_groups(&username) { + if let Some(group) = read_user_groups().get(&user_group) { + let ug_roles: HashMap> = group + .roles + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + group_roles.insert(group.name.clone(), ug_roles); + } + } + let res = RolesResponse { + direct_roles, + group_roles, + }; Ok(web::Json(res)) } // Handler for DELETE /api/v1/user/delete/{username} pub async fn delete_user(username: web::Path) -> Result { let username = username.into_inner(); - let _ = UPDATE_LOCK.lock().await; + + // if user is a part of any groups then don't allow deletion + if !Users.get_user_groups(&username).is_empty() { + return Err(RBACError::InvalidDeletionRequest(format!( + "User: {username} should not be a part of any groups" + ))); + } // fail this request if the user does not exists if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + let _ = UPDATE_LOCK.lock().await; + // delete from parseable.json first let mut metadata = get_metadata().await?; metadata.users.retain(|user| user.username() != username); - put_metadata(&metadata).await?; // update in mem table @@ -202,18 +248,31 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user -// Put roles for given user -pub async fn put_role( +// Handler PATCH /user/{username}/role/add => Add roles to a user +pub async fn add_roles_to_user( username: web::Path, - role: web::Json>, + roles_to_add: web::Json>, ) -> Result { let username = username.into_inner(); - let role = role.into_inner(); + let roles_to_add = roles_to_add.into_inner(); if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; + + let mut non_existent_roles = Vec::new(); + + // check if the role exists + for role in &roles_to_add { + if !roles().contains_key(role) { + non_existent_roles.push(role.clone()); + } + } + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + // update parseable.json first let mut metadata = get_metadata().await?; if let Some(user) = metadata @@ -221,7 +280,7 @@ pub async fn put_role( .iter_mut() .find(|user| user.username() == username) { - user.roles.clone_from(&role); + user.roles.extend(roles_to_add.clone()); } else { // should be unreachable given state is always consistent return Err(RBACError::UserDoesNotExist); @@ -229,11 +288,79 @@ pub async fn put_role( put_metadata(&metadata).await?; // update in mem table - Users.put_role(&username.clone(), role.clone()); + Users.add_roles(&username.clone(), roles_to_add); Ok(format!("Roles updated successfully for {username}")) } +// Handler PATCH /user/{username}/role/remove => Remove roles from a user +pub async fn remove_roles_from_user( + username: web::Path, + roles_to_remove: web::Json>, +) -> Result { + let username = username.into_inner(); + let roles_to_remove = roles_to_remove.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + + let mut non_existent_roles = Vec::new(); + + // check if the role exists + for role in &roles_to_remove { + if !roles().contains_key(role) { + non_existent_roles.push(role.clone()); + } + } + + if !non_existent_roles.is_empty() { + return Err(RBACError::RolesDoNotExist(non_existent_roles)); + } + + // check for role not present with user + let user_roles: HashSet = HashSet::from_iter(Users.get_role(&username)); + let roles_not_with_user: HashSet = + HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); + if !roles_not_with_user.is_empty() { + return Err(RBACError::RolesNotAssigned(Vec::from_iter( + roles_not_with_user, + ))); + } + + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.remove_roles(&username.clone(), roles_to_remove); + + Ok(format!("Roles updated successfully for {username}")) +} + +#[derive(Debug, Serialize)] +#[serde(rename = "camelCase")] +pub struct InvalidUserGroupError { + pub valid_name: bool, + pub non_existent_roles: Vec, + pub non_existent_users: Vec, + pub roles_not_in_group: Vec, + pub users_not_in_group: Vec, + pub comments: String, +} + #[derive(Debug, thiserror::Error)] pub enum RBACError { #[error("User exists already")] @@ -252,6 +379,24 @@ pub enum RBACError { Anyhow(#[from] anyhow::Error), #[error("User cannot be created without a role")] RoleValidationError, + #[error("User group `{0}` already exists")] + UserGroupExists(String), + #[error("UserGroup `{0}` does not exist")] + UserGroupDoesNotExist(String), + #[error("Invalid Roles: {0:?}")] + RolesDoNotExist(Vec), + #[error("Roles have not been assigned: {0:?}")] + RolesNotAssigned(Vec), + #[error("{0:?}")] + InvalidUserGroupRequest(Box), + #[error("{0}")] + InvalidSyncOperation(String), + #[error("User group `{0}` is still being used")] + UserGroupNotEmpty(String), + #[error("Resource `{0}` is still in use")] + ResourceInUse(String), + #[error("{0}")] + InvalidDeletionRequest(String), } impl actix_web::ResponseError for RBACError { @@ -265,12 +410,46 @@ impl actix_web::ResponseError for RBACError { Self::Network(_) => StatusCode::BAD_GATEWAY, Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::RoleValidationError => StatusCode::BAD_REQUEST, + Self::UserGroupExists(_) => StatusCode::BAD_REQUEST, + Self::UserGroupDoesNotExist(_) => StatusCode::BAD_REQUEST, + Self::RolesDoNotExist(_) => StatusCode::BAD_REQUEST, + Self::RolesNotAssigned(_) => StatusCode::BAD_REQUEST, + Self::InvalidUserGroupRequest(_) => StatusCode::BAD_REQUEST, + Self::InvalidSyncOperation(_) => StatusCode::BAD_REQUEST, + Self::UserGroupNotEmpty(_) => StatusCode::BAD_REQUEST, + Self::ResourceInUse(_) => StatusCode::BAD_REQUEST, + Self::InvalidDeletionRequest(_) => StatusCode::BAD_REQUEST, } } fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()) + match self { + RBACError::RolesNotAssigned(obj) => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(json!({ + "roles_not_assigned": obj + })), + RBACError::RolesDoNotExist(obj) => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(json!({ + "non_existent_roles": obj + })), + RBACError::InvalidUserGroupRequest(obj) => { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .json(obj) + } + _ => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()), + } } } + +#[derive(Serialize)] +#[serde(rename = "camelCase")] +pub struct RolesResponse { + #[serde(rename = "roles")] + pub direct_roles: HashMap>, + pub group_roles: HashMap>>, +} diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index c649ac248..e37ab61c4 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,6 +16,8 @@ * */ +use std::collections::HashSet; + use actix_web::{ http::header::ContentType, web::{self, Json}, @@ -26,7 +28,7 @@ use http::StatusCode; use crate::{ parseable::PARSEABLE, rbac::{ - map::{mut_roles, DEFAULT_ROLE}, + map::{mut_roles, mut_sessions, read_user_groups, users, DEFAULT_ROLE}, role::model::DefaultPrivilege, }, storage::{self, ObjectStorageError, StorageMetadata}, @@ -45,6 +47,26 @@ pub async fn put( put_metadata(&metadata).await?; mut_roles().insert(name.clone(), privileges.clone()); + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + for user_group in read_user_groups().values().cloned() { + if user_group.roles.contains(&name) { + session_refresh_users.extend(user_group.users); + } + } + + // iterate over all users to see if they have this role + for user in users().values().cloned() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.username().to_string()); + } + } + + for username in session_refresh_users { + mut_sessions().remove_user(&username); + } + Ok(HttpResponse::Ok().finish()) } @@ -73,17 +95,26 @@ pub async fn list_roles() -> Result { Ok(web::Json(roles)) } -// Handler for DELETE /api/v1/role/{username} +// Handler for DELETE /api/v1/role/{name} // Delete existing role pub async fn delete(name: web::Path) -> Result { let name = name.into_inner(); + // check if the role is being used by any user or group let mut metadata = get_metadata().await?; if metadata.users.iter().any(|user| user.roles.contains(&name)) { return Err(RoleError::RoleInUse); } + if metadata + .user_groups + .iter() + .any(|user_group| user_group.roles.contains(&name)) + { + return Err(RoleError::RoleInUse); + } metadata.roles.remove(&name); put_metadata(&metadata).await?; mut_roles().remove(&name); + Ok(HttpResponse::Ok().finish()) } diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index c42ebe94c..c0c2c42af 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -161,6 +161,43 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } +pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue { + let metadata = storage_metadata.as_object_mut().unwrap(); + metadata.remove_entry("version"); + metadata.insert("version".to_string(), JsonValue::String("v6".to_string())); + + // If user_groups is missing, add an empty array or your default structure + if !metadata.contains_key("user_groups") { + metadata.insert("user_groups".to_string(), JsonValue::Array(vec![])); + } + + // introduce user groups entry for all users + let users = metadata.get_mut("users").unwrap().as_array_mut().unwrap(); + for user in users.iter_mut() { + if !user.as_object_mut().unwrap().contains_key("user_groups") { + user.as_object_mut() + .unwrap() + .insert("user_groups".to_string(), JsonValue::Array(vec![])); + } + } + + if let Some(JsonValue::Object(roles)) = metadata.get_mut("roles") { + for (_, role_permissions) in roles.iter_mut() { + if let JsonValue::Array(permissions) = role_permissions { + for permission in permissions.iter_mut() { + if let JsonValue::Object(perm_obj) = permission { + if let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") { + resource.remove("tag"); + } + } + } + } + } + } + + storage_metadata +} + /// Remove the querier endpoint and auth token from the storage metadata pub fn remove_querier_metadata(mut storage_metadata: JsonValue) -> JsonValue { let metadata = storage_metadata.as_object_mut().unwrap(); diff --git a/src/migration/mod.rs b/src/migration/mod.rs index d11ae3f79..4216f1076 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -41,11 +41,18 @@ use crate::{ }, }; +fn get_version(metadata: &serde_json::Value) -> Option<&str> { + metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()) +} + /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration pub async fn run_metadata_migration( config: &Parseable, - parseable_json: &Option, + parseable_json: &mut Option, ) -> anyhow::Result<()> { let object_store = config.storage.get_object_store(); let mut storage_metadata: Option = None; @@ -55,52 +62,53 @@ pub async fn run_metadata_migration( } let staging_metadata = get_staging_metadata(config)?; - fn get_version(metadata: &serde_json::Value) -> Option<&str> { - metadata - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()) - } - // if storage metadata is none do nothing if let Some(storage_metadata) = storage_metadata { match get_version(&storage_metadata) { Some("v1") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v1_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v2") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v2_v3(storage_metadata); metadata = metadata_migration::v3_v4(metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v3") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v3_v4(storage_metadata); metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } Some("v4") => { - //migrate to latest version - //remove querier endpooint and token from storage metadata let mut metadata = metadata_migration::v4_v5(storage_metadata); - metadata = metadata_migration::v4_v5(metadata); + metadata = metadata_migration::v5_v6(metadata); metadata = metadata_migration::remove_querier_metadata(metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); + put_remote_metadata(&*object_store, &metadata).await?; + } + Some("v5") => { + let metadata = metadata_migration::v5_v6(storage_metadata); + let _metadata: Bytes = serde_json::to_vec(&metadata)?.into(); + *parseable_json = Some(_metadata); put_remote_metadata(&*object_store, &metadata).await?; } _ => { - //remove querier endpooint and token from storage metadata let metadata = metadata_migration::remove_querier_metadata(storage_metadata); put_remote_metadata(&*object_store, &metadata).await?; } @@ -109,28 +117,42 @@ pub async fn run_metadata_migration( // if staging metadata is none do nothing if let Some(staging_metadata) = staging_metadata { - match get_version(&staging_metadata) { - Some("v1") => { - let mut metadata = metadata_migration::v1_v3(staging_metadata); - metadata = metadata_migration::v3_v4(metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v2") => { - let mut metadata = metadata_migration::v2_v3(staging_metadata); - metadata = metadata_migration::v3_v4(metadata); - put_staging_metadata(config, &metadata)?; - } - Some("v3") => { - let metadata = metadata_migration::v3_v4(staging_metadata); - put_staging_metadata(config, &metadata)?; - } - _ => (), - } + migrate_staging(config, staging_metadata)?; } Ok(()) } +fn migrate_staging(config: &Parseable, staging_metadata: Value) -> anyhow::Result<()> { + match get_version(&staging_metadata) { + Some("v1") => { + let mut metadata = metadata_migration::v1_v3(staging_metadata); + metadata = metadata_migration::v3_v4(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v2") => { + let mut metadata = metadata_migration::v2_v3(staging_metadata); + metadata = metadata_migration::v3_v4(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v3") => { + let metadata = metadata_migration::v3_v4(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v4") => { + let metadata = metadata_migration::v4_v5(staging_metadata); + let metadata = metadata_migration::v5_v6(metadata); + put_staging_metadata(config, &metadata)?; + } + Some("v5") => { + let metadata = metadata_migration::v5_v6(staging_metadata); + put_staging_metadata(config, &metadata)?; + } + _ => (), + } + Ok(()) +} + /// run the migration for all streams concurrently pub async fn run_migration(config: &Parseable) -> anyhow::Result<()> { let storage = config.storage.get_object_store(); diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 02adc3463..8da51db7a 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -16,8 +16,10 @@ * */ -use crate::rbac::user::User; +use crate::rbac::role::ParseableResourceType; +use crate::rbac::user::{User, UserGroup}; use crate::{parseable::PARSEABLE, storage::StorageMetadata}; +use std::collections::HashSet; use std::{collections::HashMap, sync::Mutex}; use super::Response; @@ -35,6 +37,23 @@ pub static USERS: OnceCell> = OnceCell::new(); pub static ROLES: OnceCell> = OnceCell::new(); pub static DEFAULT_ROLE: Lazy>> = Lazy::new(|| Mutex::new(None)); pub static SESSIONS: OnceCell> = OnceCell::new(); +pub static USER_GROUPS: OnceCell> = OnceCell::new(); + +pub fn read_user_groups() -> RwLockReadGuard<'static, UserGroups> { + USER_GROUPS + .get() + .expect("UserGroups map not created") + .read() + .expect("UserGroups map is poisoned") +} + +pub fn write_user_groups() -> RwLockWriteGuard<'static, UserGroups> { + USER_GROUPS + .get() + .expect("UserGroups map not created") + .write() + .expect("UserGroups map is poisoned") +} pub fn users() -> RwLockReadGuard<'static, Users> { USERS @@ -90,6 +109,7 @@ pub fn mut_sessions() -> RwLockWriteGuard<'static, Sessions> { // as users authenticate pub fn init(metadata: &StorageMetadata) { let users = metadata.users.clone(); + let user_groups = metadata.user_groups.clone(); let mut roles = metadata.roles.clone(); DEFAULT_ROLE @@ -122,6 +142,9 @@ pub fn init(metadata: &StorageMetadata) { SESSIONS .set(RwLock::new(sessions)) .expect("map is only set once"); + USER_GROUPS + .set(RwLock::new(UserGroups::from(user_groups))) + .expect("Unable to create UserGroups map from storage"); } // A session is loosly active mapping to permissions @@ -199,23 +222,35 @@ impl Sessions { &self, key: &SessionKey, required_action: Action, - context_stream: Option<&str>, + context_resource: Option<&str>, context_user: Option<&str>, ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { + let mut perms: HashSet = HashSet::from_iter(perms.clone()); + perms.extend(aggregate_group_permissions(username)); + if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize Permission::Unit(action) => action == required_action || action == Action::All, - Permission::Stream(action, ref stream) - | Permission::StreamWithTag(action, ref stream, _) => { - let ok_stream = if let Some(context_stream) = context_stream { - stream == context_stream || stream == "*" - } else { - // if no stream to match then stream check is not needed - true - }; - (action == required_action || action == Action::All) && ok_stream + Permission::Resource(action, ref resource_type) => { + match resource_type { + ParseableResourceType::Stream(resource_id) + | ParseableResourceType::Llm(resource_id) => { + let ok_resource = + if let Some(context_resource_id) = context_resource { + resource_id == context_resource_id || resource_id == "*" + } else { + // if no resource to match then resource check is not needed + // WHEN IS THIS VALID?? + true + }; + (action == required_action || action == Action::All) && ok_resource + } + ParseableResourceType::All => { + action == required_action || action == Action::All + } + } } Permission::SelfUser if required_action == Action::GetUserRoles => { context_user.map(|x| x == username).unwrap_or_default() @@ -257,3 +292,55 @@ impl From> for Users { map } } + +fn aggregate_group_permissions(username: &str) -> HashSet { + let mut group_perms = HashSet::new(); + + let Some(user) = users().get(username).cloned() else { + return group_perms; + }; + + if user.user_groups.is_empty() { + return group_perms; + } + + for group_name in &user.user_groups { + let Some(group) = read_user_groups().get(group_name).cloned() else { + continue; + }; + + for role_name in &group.roles { + let Some(privileges) = roles().get(role_name).cloned() else { + continue; + }; + + for privilege in privileges { + group_perms.extend(RoleBuilder::from(&privilege).build()); + } + } + } + + group_perms +} +// Map of [user group ID --> UserGroup] +// This map is populated at startup with the list of user groups from parseable.json file +#[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)] +pub struct UserGroups(HashMap); + +impl UserGroups { + pub fn insert(&mut self, user_group: UserGroup) { + self.0.insert(user_group.name.clone(), user_group); + } +} + +impl From> for UserGroups { + fn from(user_groups: Vec) -> Self { + let mut map = Self::default(); + map.extend( + user_groups + .into_iter() + .map(|group| (group.name.to_owned(), group)), + ); + map + } +} diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 28ead768b..307066726 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -29,7 +29,7 @@ use role::model::DefaultPrivilege; use serde::Serialize; use url::Url; -use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; +use crate::rbac::map::{mut_sessions, mut_users, read_user_groups, roles, sessions, users}; use crate::rbac::role::Action; use crate::rbac::user::User; @@ -54,6 +54,13 @@ impl Users { mut_users().insert(user); } + pub fn get_user_groups(&self, username: &str) -> HashSet { + users() + .get(username) + .map(|user| user.user_groups.clone()) + .unwrap_or_default() + } + pub fn get_user(&self, username: &str) -> Option { users().get(username).cloned() } @@ -90,9 +97,17 @@ impl Users { }; } - pub fn put_role(&self, username: &str, roles: HashSet) { + pub fn add_roles(&self, username: &str, roles: HashSet) { if let Some(user) = mut_users().get_mut(username) { - user.roles = roles; + user.roles.extend(roles); + mut_sessions().remove_user(username) + }; + } + + pub fn remove_roles(&self, username: &str, roles: HashSet) { + if let Some(user) = mut_users().get_mut(username) { + let diff = HashSet::from_iter(user.roles.difference(&roles).cloned()); + user.roles = diff; mut_sessions().remove_user(username) }; } @@ -102,7 +117,26 @@ impl Users { } pub fn get_permissions(&self, session: &SessionKey) -> Vec { - sessions().get(session).cloned().unwrap_or_default() + let mut permissions = sessions().get(session).cloned().unwrap_or_default(); + + let Some(username) = self.get_username_from_session(session) else { + return permissions.into_iter().collect_vec(); + }; + + let user_groups = self.get_user_groups(&username); + for group in user_groups { + if let Some(group) = read_user_groups().get(&group) { + let group_roles = &group.roles; + for role in group_roles { + if let Some(privelege_list) = roles().get(role) { + for privelege in privelege_list { + permissions.extend(RoleBuilder::from(privelege).build()); + } + } + } + } + } + permissions.into_iter().collect_vec() } pub fn session_exists(&self, session: &SessionKey) -> bool { @@ -174,6 +208,7 @@ impl Users { /// /// TODO: rename this after deprecating the older struct #[derive(Debug, Serialize, Clone)] +#[serde(rename = "camelCase")] pub struct UsersPrism { // username pub id: String, @@ -183,8 +218,12 @@ pub struct UsersPrism { pub email: Option, // picture only if oauth pub picture: Option, - // roles for the user + // roles given directly to the user pub roles: HashMap>, + // roles inherited by the user from their usergroups + pub group_roles: HashMap>>, + // user groups + pub user_groups: HashSet, } fn roles_to_permission(roles: Vec) -> Vec { diff --git a/src/rbac/role.rs b/src/rbac/role.rs index f0546aaaa..9e54bb96a 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -20,6 +20,10 @@ // Represents actions that corresponds to an api #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Action { + CreateUserGroup, + GetUserGroup, + ModifyUserGroup, + DeleteUserGroup, Ingest, Query, CreateStream, @@ -47,10 +51,10 @@ pub enum Action { DeleteRole, ListRole, GetAbout, - QueryLLM, AddLLM, DeleteLLM, GetLLM, + QueryLLM, ListLLM, ListCluster, ListClusterMetrics, @@ -73,11 +77,20 @@ pub enum Action { PutCorrelation, } +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub enum ParseableResourceType { + #[serde(rename = "stream")] + Stream(String), + #[serde(rename = "llmKey")] + Llm(String), + #[serde(rename = "all")] + All, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Permission { Unit(Action), - Stream(Action, String), - StreamWithTag(Action, String, Option), + Resource(Action, ParseableResourceType), SelfUser, } @@ -85,19 +98,13 @@ pub enum Permission { #[derive(Debug, Default)] pub struct RoleBuilder { actions: Vec, - stream: Option, - tag: Option, + resource_type: Option, } // R x P impl RoleBuilder { - pub fn with_stream(mut self, stream: String) -> Self { - self.stream = Some(stream); - self - } - - pub fn with_tag(mut self, tag: String) -> Self { - self.tag = Some(tag); + pub fn with_resource(mut self, resource_type: ParseableResourceType) -> Self { + self.resource_type = Some(resource_type); self } @@ -105,11 +112,6 @@ impl RoleBuilder { let mut perms = Vec::new(); for action in self.actions { let perm = match action { - Action::Query => Permission::StreamWithTag( - action, - self.stream.clone().unwrap(), - self.tag.clone(), - ), Action::Login | Action::Metrics | Action::PutUser @@ -118,11 +120,6 @@ impl RoleBuilder { | Action::GetUserRoles | Action::DeleteUser | Action::GetAbout - | Action::QueryLLM - | Action::AddLLM - | Action::DeleteLLM - | Action::GetLLM - | Action::ListLLM | Action::PutRole | Action::GetRole | Action::DeleteRole @@ -151,15 +148,25 @@ impl RoleBuilder { | Action::PutAlert | Action::GetAlert | Action::DeleteAlert + | Action::CreateUserGroup + | Action::GetUserGroup + | Action::DeleteUserGroup + | Action::ModifyUserGroup | Action::GetAnalytics => Permission::Unit(action), - Action::Ingest + Action::Query + | Action::QueryLLM + | Action::AddLLM + | Action::DeleteLLM + | Action::GetLLM + | Action::ListLLM + | Action::Ingest | Action::ListStream | Action::GetSchema | Action::DetectSchema | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), + | Action::All => Permission::Resource(action, self.resource_type.clone().unwrap()), }; perms.push(perm); } @@ -172,16 +179,18 @@ impl RoleBuilder { // we can put same model in the backend // user -> Vec pub mod model { + use crate::rbac::role::ParseableResourceType; + use super::{Action, RoleBuilder}; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Hash)] - #[serde(tag = "privilege", content = "resource", rename_all = "lowercase")] + #[serde(tag = "privilege", rename_all = "lowercase")] pub enum DefaultPrivilege { Admin, Editor, - Writer { stream: String }, - Ingestor { stream: String }, - Reader { stream: String, tag: Option }, + Writer { resource: ParseableResourceType }, + Ingestor { resource: ParseableResourceType }, + Reader { resource: ParseableResourceType }, } impl From<&DefaultPrivilege> for RoleBuilder { @@ -189,18 +198,14 @@ pub mod model { match value { DefaultPrivilege::Admin => admin_perm_builder(), DefaultPrivilege::Editor => editor_perm_builder(), - DefaultPrivilege::Writer { stream } => { - writer_perm_builder().with_stream(stream.to_owned()) + DefaultPrivilege::Writer { resource } => { + writer_perm_builder().with_resource(resource.to_owned()) } - DefaultPrivilege::Reader { stream, tag } => { - let mut reader = reader_perm_builder().with_stream(stream.to_owned()); - if let Some(tag) = tag { - reader = reader.with_tag(tag.to_owned()) - } - reader + DefaultPrivilege::Reader { resource } => { + reader_perm_builder().with_resource(resource.to_owned()) } - DefaultPrivilege::Ingestor { stream } => { - ingest_perm_builder().with_stream(stream.to_owned()) + DefaultPrivilege::Ingestor { resource } => { + ingest_perm_builder().with_resource(resource.to_owned()) } } } @@ -209,8 +214,7 @@ pub mod model { fn admin_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::All], - stream: Some("*".to_string()), - tag: None, + resource_type: Some(ParseableResourceType::All), } } @@ -241,11 +245,11 @@ pub mod model { Action::PutAlert, Action::GetAlert, Action::DeleteAlert, - Action::QueryLLM, - Action::GetLLM, - Action::ListLLM, Action::AddLLM, Action::DeleteLLM, + Action::GetLLM, + Action::QueryLLM, + Action::ListLLM, Action::CreateFilter, Action::ListFilter, Action::GetFilter, @@ -256,8 +260,7 @@ pub mod model { Action::DeleteDashboard, Action::GetUserRoles, ], - stream: Some("*".to_string()), - tag: None, + resource_type: Some(ParseableResourceType::All), } } @@ -287,11 +290,9 @@ pub mod model { Action::CreateDashboard, Action::DeleteDashboard, Action::Ingest, - Action::QueryLLM, Action::GetLLM, + Action::QueryLLM, Action::ListLLM, - Action::AddLLM, - Action::DeleteLLM, Action::GetStreamInfo, Action::GetFilter, Action::ListFilter, @@ -299,8 +300,7 @@ pub mod model { Action::DeleteFilter, Action::GetUserRoles, ], - stream: None, - tag: None, + resource_type: None, } } @@ -313,11 +313,9 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, - Action::QueryLLM, Action::GetLLM, + Action::QueryLLM, Action::ListLLM, - Action::AddLLM, - Action::DeleteLLM, Action::ListFilter, Action::GetFilter, Action::CreateFilter, @@ -335,16 +333,14 @@ pub mod model { Action::GetUserRoles, Action::GetAlert, ], - stream: None, - tag: None, + resource_type: None, } } fn ingest_perm_builder() -> RoleBuilder { RoleBuilder { actions: vec![Action::Ingest], - stream: None, - tag: None, + resource_type: None, } } } diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 45fc50d79..5fba8975a 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -25,7 +25,14 @@ use argon2::{ use rand::distributions::{Alphanumeric, DistString}; -use crate::parseable::PARSEABLE; +use crate::{ + handlers::http::{ + modal::utils::rbac_utils::{get_metadata, put_metadata}, + rbac::{InvalidUserGroupError, RBACError}, + }, + parseable::PARSEABLE, + rbac::map::{mut_sessions, read_user_groups, roles, users}, +}; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(untagged)] @@ -39,6 +46,7 @@ pub struct User { #[serde(flatten)] pub ty: UserType, pub roles: HashSet, + pub user_groups: HashSet, } impl User { @@ -52,6 +60,7 @@ impl User { password_hash: hash, }), roles: HashSet::new(), + user_groups: HashSet::new(), }, password, ) @@ -64,6 +73,7 @@ impl User { user_info, }), roles, + user_groups: HashSet::new(), } } @@ -147,6 +157,7 @@ pub fn get_admin_user() -> User { password_hash: hashcode, }), roles: ["admin".to_string()].into(), + user_groups: HashSet::new(), } } @@ -185,3 +196,129 @@ impl From for UserInfo { } } } + +/// Logically speaking, UserGroup is a collection of roles and is applied to a collection of users. +/// +/// The users present in a group inherit all the roles present in the group for as long as they are a part of the group. +#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct UserGroup { + pub name: String, + // #[serde(default = "crate::utils::uid::gen")] + // pub id: Ulid, + pub roles: HashSet, + pub users: HashSet, +} + +fn is_valid_group_name(name: &str) -> bool { + let re = regex::Regex::new(r"^[A-Za-z0-9_-]+$").unwrap(); + re.is_match(name) +} + +impl UserGroup { + pub fn validate(&self) -> Result<(), RBACError> { + let valid_name = is_valid_group_name(&self.name); + + if read_user_groups().contains_key(&self.name) { + return Err(RBACError::UserGroupExists(self.name.clone())); + } + let mut non_existent_roles = Vec::new(); + if !self.roles.is_empty() { + // validate that the roles exist + for role in &self.roles { + if !roles().contains_key(role) { + non_existent_roles.push(role.clone()); + } + } + } + let mut non_existent_users = Vec::new(); + if !self.users.is_empty() { + // validate that the users exist + for user in &self.users { + if !users().contains_key(user) { + non_existent_users.push(user.clone()); + } + } + } + + if !non_existent_roles.is_empty() || !non_existent_users.is_empty() || !valid_name { + let comments = if !valid_name { + "The name should follow this regex- `^[A-Za-z0-9_-]+$`".to_string() + } else { + "".to_string() + }; + Err(RBACError::InvalidUserGroupRequest(Box::new( + InvalidUserGroupError { + valid_name, + non_existent_roles, + non_existent_users, + roles_not_in_group: vec![], + users_not_in_group: vec![], + comments, + }, + ))) + } else { + Ok(()) + } + } + pub fn new(name: String, roles: HashSet, users: HashSet) -> Self { + UserGroup { name, roles, users } + } + + pub fn add_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { + self.roles.extend(roles); + // also refresh all user sessions + for username in &self.users { + mut_sessions().remove_user(username); + } + Ok(()) + } + + pub fn add_users(&mut self, users: HashSet) -> Result<(), RBACError> { + self.users.extend(users.clone()); + // also refresh all user sessions + for username in &users { + mut_sessions().remove_user(username); + } + Ok(()) + } + + pub fn remove_roles(&mut self, roles: HashSet) -> Result<(), RBACError> { + let old_roles = &self.roles; + let new_roles = HashSet::from_iter(self.roles.difference(&roles).cloned()); + + if old_roles.eq(&new_roles) { + return Ok(()); + } + self.roles.clone_from(&new_roles); + + // also refresh all user sessions + for username in &self.users { + mut_sessions().remove_user(username); + } + Ok(()) + } + + pub fn remove_users(&mut self, users: HashSet) -> Result<(), RBACError> { + let old_users = &self.users; + let new_users = HashSet::from_iter(self.users.difference(&users).cloned()); + + if old_users.eq(&new_users) { + return Ok(()); + } + // also refresh all user sessions + for username in &users { + mut_sessions().remove_user(username); + } + self.users.clone_from(&new_users); + + Ok(()) + } + + pub async fn update_in_metadata(&self) -> Result<(), RBACError> { + let mut metadata = get_metadata().await?; + metadata.user_groups.retain(|x| x.name != self.name); + metadata.user_groups.push(self.clone()); + put_metadata(&metadata).await?; + Ok(()) + } +} diff --git a/src/rbac/utils.rs b/src/rbac/utils.rs index a52d03868..df9dd52bf 100644 --- a/src/rbac/utils.rs +++ b/src/rbac/utils.rs @@ -15,11 +15,11 @@ * along with this program. If not, see . * */ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use url::Url; -use crate::parseable::PARSEABLE; +use crate::{parseable::PARSEABLE, rbac::map::read_user_groups}; use super::{ map::roles, @@ -38,7 +38,7 @@ pub fn to_prism_user(user: &User) -> UsersPrism { oauth.user_info.picture.clone(), ), }; - let roles: HashMap> = Users + let direct_roles: HashMap> = Users .get_role(id) .iter() .filter_map(|role_name| { @@ -48,12 +48,33 @@ pub fn to_prism_user(user: &User) -> UsersPrism { }) .collect(); + let mut group_roles: HashMap>> = HashMap::new(); + let mut user_groups = HashSet::new(); + // user might be part of some user groups, fetch the roles from there as well + for user_group in Users.get_user_groups(user.username()) { + if let Some(group) = read_user_groups().get(&user_group) { + let ug_roles: HashMap> = group + .roles + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + group_roles.insert(group.name.clone(), ug_roles); + user_groups.insert(user_group); + } + } + UsersPrism { id: id.into(), method: method.into(), email: mask_pii_string(email), picture: mask_pii_url(picture), - roles, + roles: direct_roles, + group_roles, + user_groups, } } diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 93a3ca76c..31f5283b4 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -30,7 +30,10 @@ use std::io; use crate::{ option::Mode, parseable::{JOIN_COMMUNITY, PARSEABLE}, - rbac::{role::model::DefaultPrivilege, user::User}, + rbac::{ + role::model::DefaultPrivilege, + user::{User, UserGroup}, + }, storage::ObjectStorageError, utils::uid, }; @@ -39,7 +42,7 @@ use super::PARSEABLE_METADATA_FILE_NAME; // Expose some static variables for internal usage pub static STORAGE_METADATA: OnceCell = OnceCell::new(); -pub const CURRENT_STORAGE_METADATA_VERSION: &str = "v4"; +pub const CURRENT_STORAGE_METADATA_VERSION: &str = "v6"; // For use in global static #[derive(Debug, PartialEq, Eq)] pub struct StaticStorageMetadata { @@ -57,6 +60,7 @@ pub struct StorageMetadata { #[serde(default = "crate::utils::uid::gen")] pub deployment_id: uid::Uid, pub users: Vec, + pub user_groups: Vec, pub streams: Vec, pub server_mode: Mode, #[serde(default)] @@ -75,6 +79,7 @@ impl Default for StorageMetadata { deployment_id: uid::gen(), server_mode: PARSEABLE.options.mode, users: Vec::new(), + user_groups: Vec::new(), streams: Vec::new(), roles: HashMap::default(), default_role: None, @@ -86,7 +91,7 @@ impl StorageMetadata { pub fn global() -> &'static StaticStorageMetadata { STORAGE_METADATA .get() - .expect("gloabal static is initialized") + .expect("global static is initialized") } pub fn set_global(self) { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 71c07f5dc..e4edb0692 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -93,9 +93,8 @@ pub async fn user_auth_for_query( session_key: &SessionKey, query: &str, ) -> Result<(), actix_web::error::Error> { - let tables = get_tables_from_query(query).await?; + let tables = get_tables_from_query(query).await?.into_inner(); let permissions = Users.get_permissions(session_key); - let tables = tables.into_inner(); user_auth_for_datasets(&permissions, &tables) } @@ -110,14 +109,17 @@ pub fn user_auth_for_datasets( // also while iterating add any filter tags for this stream for permission in permissions.iter() { match permission { - Permission::Stream(Action::All, _) => { + Permission::Resource(Action::All, _) => { authorized = true; break; } - Permission::StreamWithTag(Action::Query, ref stream, _) - if stream == table_name || stream == "*" => - { - authorized = true; + Permission::Resource( + Action::Query, + crate::rbac::role::ParseableResourceType::Stream(stream), + ) => { + if stream == table_name || stream == "*" { + authorized = true; + } } _ => (), }