Skip to content

feat: Introduce User Groups to Parseable #1366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,29 @@ pub async fn sync_streams_with_ingestors(
pub async fn sync_users_with_roles_with_ingestors(
username: &str,
role: &HashSet<String>,
operation: &str,
) -> Result<(), RBACError> {
match operation {
"add" | "remove" => {}
_ => return Err(RBACError::InvalidSyncOperation(operation.to_string())),
}

let role_data = to_vec(&role.clone()).map_err(|err| {
error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
})?;

let username = username.to_owned();

let op = operation.to_string();

for_each_live_ingestor(move |ingestor| {
let url = format!(
"{}{}/user/{}/role/sync",
"{}{}/user/{}/role/sync/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
username,
op
);

let role_data = role_data.clone();
Expand Down
20 changes: 14 additions & 6 deletions src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct CommonAttributes {

pub trait RouteExt {
fn authorize(self, action: Action) -> Self;
fn authorize_for_stream(self, action: Action) -> Self;
fn authorize_for_resource(self, action: Action) -> Self;
fn authorize_for_user(self, action: Action) -> Self;
}

Expand All @@ -71,10 +71,10 @@ impl RouteExt for Route {
})
}

fn authorize_for_stream(self, action: Action) -> Self {
fn authorize_for_resource(self, action: Action) -> Self {
self.wrap(Auth {
action,
method: auth_stream_context,
method: auth_resource_context,
})
}

Expand Down Expand Up @@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result<rbac:
creds.map(|key| Users.authorize(key, action, None, None))
}

pub fn auth_stream_context(
pub fn auth_resource_context(
req: &mut ServiceRequest,
action: Action,
) -> Result<rbac::Response, Error> {
let creds = extract_session_key(req);
let usergroup = req.match_info().get("usergroup");
let llmid = req.match_info().get("llmid");
let mut stream = req.match_info().get("logstream");
if stream.is_none() {
if let Some(usergroup) = usergroup {
creds.map(|key| Users.authorize(key, action, Some(usergroup), None))
} else if let Some(llmid) = llmid {
creds.map(|key| Users.authorize(key, action, Some(llmid), None))
} else if let Some(stream) = stream {
creds.map(|key| Users.authorize(key, action, Some(stream), None))
} else {
if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) {
stream = Some(stream_name.to_str().unwrap());
}
creds.map(|key| Users.authorize(key, action, stream, None))
}
creds.map(|key| Users.authorize(key, action, stream, None))
}

pub fn auth_user_context(
Expand Down
86 changes: 78 additions & 8 deletions src/handlers/http/modal/ingest/ingestor_rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::sync::Mutex;
use crate::{
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
rbac::{
map::roles,
user::{self, User as ParseableUser},
Users,
},
Expand All @@ -48,7 +49,7 @@ pub async fn post_user(
let _ = storage::put_staging_metadata(&metadata);
let created_role = user.roles.clone();
Users.put_user(user.clone());
Users.put_role(&username, created_role.clone());
Users.add_roles(&username, created_role.clone());
}

Ok(generated_password)
Expand All @@ -73,34 +74,103 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
Ok(format!("deleted user: {username}"))
}

// Handler PUT /user/{username}/roles => Put roles for user
// Put roles for given user
pub async fn put_role(
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
pub async fn add_roles_to_user(
username: web::Path<String>,
role: web::Json<HashSet<String>>,
roles_to_add: web::Json<HashSet<String>>,
) -> Result<String, RBACError> {
let username = username.into_inner();
let role = role.into_inner();
let roles_to_add = roles_to_add.into_inner();

if !Users.contains(&username) {
return Err(RBACError::UserDoesNotExist);
};

// check if all roles exist
let mut non_existent_roles = Vec::new();
roles_to_add.iter().for_each(|r| {
if roles().get(r).is_none() {
non_existent_roles.push(r.clone());
}
});

if !non_existent_roles.is_empty() {
return Err(RBACError::RolesDoNotExist(non_existent_roles));
}

// update parseable.json first
let mut metadata = get_metadata().await?;
if let Some(user) = metadata
.users
.iter_mut()
.find(|user| user.username() == username)
{
user.roles.extend(roles_to_add.clone());
} else {
// should be unreachable given state is always consistent
return Err(RBACError::UserDoesNotExist);
}

let _ = storage::put_staging_metadata(&metadata);
// update in mem table
Users.add_roles(&username.clone(), roles_to_add.clone());

Ok(format!("Roles updated successfully for {username}"))
}

// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
pub async fn remove_roles_from_user(
username: web::Path<String>,
roles_to_remove: web::Json<HashSet<String>>,
) -> Result<String, RBACError> {
let username = username.into_inner();
let roles_to_remove = roles_to_remove.into_inner();

if !Users.contains(&username) {
return Err(RBACError::UserDoesNotExist);
};

// check if all roles exist
let mut non_existent_roles = Vec::new();
roles_to_remove.iter().for_each(|r| {
if roles().get(r).is_none() {
non_existent_roles.push(r.clone());
}
});

if !non_existent_roles.is_empty() {
return Err(RBACError::RolesDoNotExist(non_existent_roles));
}

// check that user actually has these roles
let user_roles: HashSet<String> = HashSet::from_iter(Users.get_role(&username));
let roles_not_with_user: HashSet<String> =
HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned());

if !roles_not_with_user.is_empty() {
return Err(RBACError::RolesNotAssigned(Vec::from_iter(
roles_not_with_user,
)));
}

// update parseable.json first
let mut metadata = get_metadata().await?;
if let Some(user) = metadata
.users
.iter_mut()
.find(|user| user.username() == username)
{
user.roles.clone_from(&role);
let diff: HashSet<String> =
HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned());
user.roles = diff;
} else {
// should be unreachable given state is always consistent
return Err(RBACError::UserDoesNotExist);
}

let _ = storage::put_staging_metadata(&metadata);
// update in mem table
Users.put_role(&username.clone(), role.clone());
Users.remove_roles(&username.clone(), roles_to_remove.clone());

Ok(format!("Roles updated successfully for {username}"))
}
Expand Down
27 changes: 26 additions & 1 deletion src/handlers/http/modal/ingest/ingestor_role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
*
*/

use std::collections::HashSet;

use actix_web::{
web::{self, Json},
HttpResponse, Responder,
};

use crate::{
handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError},
rbac::{map::mut_roles, role::model::DefaultPrivilege},
rbac::{
map::{mut_roles, mut_sessions, read_user_groups, users},
role::model::DefaultPrivilege,
},
storage,
};

Expand All @@ -40,5 +45,25 @@ pub async fn put(
let _ = storage::put_staging_metadata(&metadata);
mut_roles().insert(name.clone(), privileges);

// refresh the sessions of all users using this role
// for this, iterate over all user_groups and users and create a hashset of users
let mut session_refresh_users: HashSet<String> = HashSet::new();
for user_group in read_user_groups().values().cloned() {
if user_group.roles.contains(&name) {
session_refresh_users.extend(user_group.users);
}
}

// iterate over all users to see if they have this role
for user in users().values().cloned() {
if user.roles.contains(&name) {
session_refresh_users.insert(user.username().to_string());
}
}

for username in session_refresh_users {
mut_sessions().remove_user(&username);
}

Ok(HttpResponse::Ok().finish())
}
28 changes: 19 additions & 9 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,21 @@ impl IngestServer {
.wrap(DisAllowRootUser),
)
.service(
web::resource("/{username}/role/sync")
// PUT /user/{username}/roles => Put roles for user
web::resource("/{username}/role/sync/add")
// PATCH /user/{username}/role/sync/add => Add roles to a user
.route(
web::put()
.to(ingestor_rbac::put_role)
web::patch()
.to(ingestor_rbac::add_roles_to_user)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
)
.service(
web::resource("/{username}/role/sync/remove")
// PATCH /user/{username}/role/sync/remove => Remove roles from a user
.route(
web::patch()
.to(ingestor_rbac::remove_roles_from_user)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
Expand All @@ -227,7 +237,7 @@ impl IngestServer {
.route(
web::post()
.to(ingest::post_event)
.authorize_for_stream(Action::Ingest),
.authorize_for_resource(Action::Ingest),
)
.wrap(from_fn(
resource_check::check_resource_utilization_middleware,
Expand All @@ -245,31 +255,31 @@ impl IngestServer {
.route(
web::put()
.to(ingestor_logstream::put_stream)
.authorize_for_stream(Action::CreateStream),
.authorize_for_resource(Action::CreateStream),
),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
web::resource("/info").route(
web::get()
.to(logstream::get_stream_info)
.authorize_for_stream(Action::GetStreamInfo),
.authorize_for_resource(Action::GetStreamInfo),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
.authorize_for_resource(Action::GetStats),
),
)
.service(
web::scope("/retention").service(
web::resource("/cleanup").route(
web::post()
.to(ingestor_logstream::retention_cleanup)
.authorize_for_stream(Action::PutRetention),
.authorize_for_resource(Action::PutRetention),
),
),
),
Expand Down
Loading
Loading