Skip to content

Commit 89c9b48

Browse files
committed
Updates: coderabbit suggestions
1 parent 7d8c259 commit 89c9b48

File tree

7 files changed

+149
-208
lines changed

7 files changed

+149
-208
lines changed

src/handlers/http/modal/ingest/ingestor_rbac.rs

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio::sync::Mutex;
2424
use crate::{
2525
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
2626
rbac::{
27+
map::roles,
2728
user::{self, User as ParseableUser},
2829
Users,
2930
},
@@ -73,38 +74,6 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
7374
Ok(format!("deleted user: {username}"))
7475
}
7576

76-
// // Handler PUT /user/{username}/roles => Put roles for user
77-
// // Put roles for given user
78-
// pub async fn put_role(
79-
// username: web::Path<String>,
80-
// role: web::Json<HashSet<String>>,
81-
// ) -> Result<String, RBACError> {
82-
// let username = username.into_inner();
83-
// let role = role.into_inner();
84-
85-
// if !Users.contains(&username) {
86-
// return Err(RBACError::UserDoesNotExist);
87-
// };
88-
// // update parseable.json first
89-
// let mut metadata = get_metadata().await?;
90-
// if let Some(user) = metadata
91-
// .users
92-
// .iter_mut()
93-
// .find(|user| user.username() == username)
94-
// {
95-
// user.roles.clone_from(&role);
96-
// } else {
97-
// // should be unreachable given state is always consistent
98-
// return Err(RBACError::UserDoesNotExist);
99-
// }
100-
101-
// let _ = storage::put_staging_metadata(&metadata);
102-
// // update in mem table
103-
// Users.add_roles(&username.clone(), role.clone());
104-
105-
// Ok(format!("Roles updated successfully for {username}"))
106-
// }
107-
10877
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
10978
pub async fn add_roles_to_user(
11079
username: web::Path<String>,
@@ -116,6 +85,19 @@ pub async fn add_roles_to_user(
11685
if !Users.contains(&username) {
11786
return Err(RBACError::UserDoesNotExist);
11887
};
88+
89+
// check if all roles exist
90+
let mut non_existent_roles = Vec::new();
91+
roles_to_add.iter().for_each(|r| {
92+
if roles().get(r).is_none() {
93+
non_existent_roles.push(r.clone());
94+
}
95+
});
96+
97+
if !non_existent_roles.is_empty() {
98+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
99+
}
100+
119101
// update parseable.json first
120102
let mut metadata = get_metadata().await?;
121103
if let Some(user) = metadata
@@ -147,6 +129,30 @@ pub async fn remove_roles_from_user(
147129
if !Users.contains(&username) {
148130
return Err(RBACError::UserDoesNotExist);
149131
};
132+
133+
// check if all roles exist
134+
let mut non_existent_roles = Vec::new();
135+
roles_to_remove.iter().for_each(|r| {
136+
if roles().get(r).is_none() {
137+
non_existent_roles.push(r.clone());
138+
}
139+
});
140+
141+
if !non_existent_roles.is_empty() {
142+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
143+
}
144+
145+
// check that user actually has these roles
146+
let user_roles: HashSet<String> = HashSet::from_iter(Users.get_role(&username));
147+
let roles_not_with_user: HashSet<String> =
148+
HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned());
149+
150+
if !roles_not_with_user.is_empty() {
151+
return Err(RBACError::RolesNotAssigned(Vec::from_iter(
152+
roles_not_with_user,
153+
)));
154+
}
155+
150156
// update parseable.json first
151157
let mut metadata = get_metadata().await?;
152158
if let Some(user) = metadata

src/handlers/http/rbac.rs

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -237,24 +237,23 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
237237
let mut metadata = get_metadata().await?;
238238
metadata.users.retain(|user| user.username() != username);
239239

240-
// also delete from user groups
240+
// Remove user from all groups
241241
let user_groups = Users.get_user_groups(&username);
242-
let mut groups_to_update = Vec::new();
243-
for user_group in user_groups {
244-
if let Some(ug) = write_user_groups().get_mut(&user_group) {
245-
ug.remove_users(HashSet::from_iter([username.clone()]))?;
246-
groups_to_update.push(ug.clone());
247-
// ug.update_in_metadata().await?;
248-
} else {
249-
continue;
250-
};
242+
{
243+
let mut groups = write_user_groups();
244+
for group_name in &user_groups {
245+
if let Some(group) = groups.get_mut(group_name) {
246+
group.remove_users(HashSet::from_iter([username.clone()]))?;
247+
}
248+
}
251249
}
252250

253-
// update in metadata user groups
254-
metadata
255-
.user_groups
256-
.retain(|x| !groups_to_update.contains(x));
257-
metadata.user_groups.extend(groups_to_update);
251+
// Update metadata with modified groups
252+
for group in metadata.user_groups.iter_mut() {
253+
if user_groups.contains(&group.name) {
254+
group.users.retain(|u| u != &username);
255+
}
256+
}
258257
put_metadata(&metadata).await?;
259258

260259
// update in mem table
@@ -375,24 +374,6 @@ pub struct InvalidUserGroupError {
375374
pub comments: String,
376375
}
377376

378-
// impl Display for InvalidUserGroupRequestStruct {
379-
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380-
// if !self.invalid_name {
381-
// write!(
382-
// f,
383-
// "Invalid user group request- {{invalidName: {}\nnonExistantRoles: {:?}\nnonExistantUsers: {:?}\nThe name should follow this regex- ^[A-Za-z0-9_-]+$}}",
384-
// self.invalid_name, self.non_existant_roles, self.non_existant_users
385-
// )
386-
// } else {
387-
// write!(
388-
// f,
389-
// "Invalid user group request- {{nonExistantRoles: {:?}\nnonExistantUsers: {:?}}}",
390-
// self.non_existant_roles, self.non_existant_users
391-
// )
392-
// }
393-
// }
394-
// }
395-
396377
#[derive(Debug, thiserror::Error)]
397378
pub enum RBACError {
398379
#[error("User exists already")]

src/handlers/http/role.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use std::collections::HashSet;
20-
2119
use actix_web::{
2220
http::header::ContentType,
2321
web::{self, Json},
@@ -103,23 +101,21 @@ pub async fn delete(name: web::Path<String>) -> Result<impl Responder, RoleError
103101
};
104102
}
105103

104+
// remove role from all user groups that have it
106105
let mut groups_to_update = Vec::new();
107-
for user_group in group_names {
108-
if let Some(ug) = write_user_groups().get_mut(&user_group) {
109-
ug.remove_roles(HashSet::from_iter([name.clone()]))
110-
.map_err(|e| RoleError::Anyhow(anyhow::Error::msg(e.to_string())))?;
111-
groups_to_update.push(ug.clone());
112-
// ug.update_in_metadata().await?;
113-
} else {
114-
continue;
115-
};
106+
for group in write_user_groups().values_mut() {
107+
if group.roles.remove(&name) {
108+
groups_to_update.push(group.clone());
109+
}
116110
}
117111

118-
// update in metadata
119-
metadata
120-
.user_groups
121-
.retain(|x| !groups_to_update.contains(x));
122-
metadata.user_groups.extend(groups_to_update);
112+
// update metadata only if there are changes
113+
if !groups_to_update.is_empty() {
114+
metadata
115+
.user_groups
116+
.retain(|x| !groups_to_update.contains(x));
117+
metadata.user_groups.extend(groups_to_update);
118+
}
123119
put_metadata(&metadata).await?;
124120

125121
Ok(HttpResponse::Ok().finish())

src/migration/mod.rs

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ use crate::{
4141
},
4242
};
4343

44+
fn get_version(metadata: &serde_json::Value) -> Option<&str> {
45+
metadata
46+
.as_object()
47+
.and_then(|meta| meta.get("version"))
48+
.and_then(|version| version.as_str())
49+
}
50+
4451
/// Migrate the metdata from v1 or v2 to v3
4552
/// This is a one time migration
4653
pub async fn run_metadata_migration(
@@ -55,13 +62,6 @@ pub async fn run_metadata_migration(
5562
}
5663
let staging_metadata = get_staging_metadata(config)?;
5764

58-
fn get_version(metadata: &serde_json::Value) -> Option<&str> {
59-
metadata
60-
.as_object()
61-
.and_then(|meta| meta.get("version"))
62-
.and_then(|version| version.as_str())
63-
}
64-
6565
// if storage metadata is none do nothing
6666
if let Some(storage_metadata) = storage_metadata {
6767
match get_version(&storage_metadata) {
@@ -117,37 +117,42 @@ pub async fn run_metadata_migration(
117117

118118
// if staging metadata is none do nothing
119119
if let Some(staging_metadata) = staging_metadata {
120-
match get_version(&staging_metadata) {
121-
Some("v1") => {
122-
let mut metadata = metadata_migration::v1_v3(staging_metadata);
123-
metadata = metadata_migration::v3_v4(metadata);
124-
put_staging_metadata(config, &metadata)?;
125-
}
126-
Some("v2") => {
127-
let mut metadata = metadata_migration::v2_v3(staging_metadata);
128-
metadata = metadata_migration::v3_v4(metadata);
129-
put_staging_metadata(config, &metadata)?;
130-
}
131-
Some("v3") => {
132-
let metadata = metadata_migration::v3_v4(staging_metadata);
133-
put_staging_metadata(config, &metadata)?;
134-
}
135-
Some("v4") => {
136-
let metadata = metadata_migration::v4_v5(staging_metadata);
137-
let metadata = metadata_migration::v5_v6(metadata);
138-
put_staging_metadata(config, &metadata)?;
139-
}
140-
Some("v5") => {
141-
let metadata = metadata_migration::v5_v6(staging_metadata);
142-
put_staging_metadata(config, &metadata)?;
143-
}
144-
_ => (),
145-
}
120+
migrate_staging(config, staging_metadata)?;
146121
}
147122

148123
Ok(())
149124
}
150125

126+
fn migrate_staging(config: &Parseable, staging_metadata: Value) -> anyhow::Result<()> {
127+
match get_version(&staging_metadata) {
128+
Some("v1") => {
129+
let mut metadata = metadata_migration::v1_v3(staging_metadata);
130+
metadata = metadata_migration::v3_v4(metadata);
131+
put_staging_metadata(config, &metadata)?;
132+
}
133+
Some("v2") => {
134+
let mut metadata = metadata_migration::v2_v3(staging_metadata);
135+
metadata = metadata_migration::v3_v4(metadata);
136+
put_staging_metadata(config, &metadata)?;
137+
}
138+
Some("v3") => {
139+
let metadata = metadata_migration::v3_v4(staging_metadata);
140+
put_staging_metadata(config, &metadata)?;
141+
}
142+
Some("v4") => {
143+
let metadata = metadata_migration::v4_v5(staging_metadata);
144+
let metadata = metadata_migration::v5_v6(metadata);
145+
put_staging_metadata(config, &metadata)?;
146+
}
147+
Some("v5") => {
148+
let metadata = metadata_migration::v5_v6(staging_metadata);
149+
put_staging_metadata(config, &metadata)?;
150+
}
151+
_ => (),
152+
}
153+
Ok(())
154+
}
155+
151156
/// run the migration for all streams concurrently
152157
pub async fn run_migration(config: &Parseable) -> anyhow::Result<()> {
153158
let storage = config.storage.get_object_store();

src/rbac/map.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -226,33 +226,9 @@ impl Sessions {
226226
context_user: Option<&str>,
227227
) -> Option<Response> {
228228
self.active_sessions.get(key).map(|(username, perms)| {
229-
// if user is a part of any user groups, then add permissions
230-
let perms: HashSet<Permission> = if let Some(user) = users().0.get(username) {
231-
let all_groups_roles = user
232-
.user_groups
233-
.iter()
234-
.filter(|id| (read_user_groups().0.contains_key(*id)))
235-
.map(|id| read_user_groups().0.get(id).unwrap().roles.clone())
236-
.reduce(|mut acc, e| {
237-
acc.extend(e);
238-
acc
239-
})
240-
.unwrap_or_default();
241-
242-
let mut privilege_list = Vec::new();
243-
all_groups_roles
244-
.iter()
245-
.filter_map(|role| roles().get(role).cloned())
246-
.for_each(|privileges| privilege_list.extend(privileges));
247-
248-
let mut perms = HashSet::from_iter(perms.clone());
249-
for privs in privilege_list {
250-
perms.extend(RoleBuilder::from(&privs).build())
251-
}
252-
perms
253-
} else {
254-
HashSet::from_iter(perms.clone())
255-
};
229+
let mut perms: HashSet<Permission> = HashSet::from_iter(perms.clone());
230+
perms.extend(aggregate_group_permissions(username));
231+
256232
if perms.iter().any(|user_perm| {
257233
match *user_perm {
258234
// if any action is ALL then we we authorize
@@ -317,6 +293,35 @@ impl From<Vec<User>> for Users {
317293
}
318294
}
319295

296+
fn aggregate_group_permissions(username: &str) -> HashSet<Permission> {
297+
let mut group_perms = HashSet::new();
298+
299+
let Some(user) = users().get(username).cloned() else {
300+
return group_perms;
301+
};
302+
303+
if user.user_groups.is_empty() {
304+
return group_perms;
305+
}
306+
307+
for group_name in &user.user_groups {
308+
let Some(group) = read_user_groups().get(group_name).cloned() else {
309+
continue;
310+
};
311+
312+
for role_name in &group.roles {
313+
let Some(privileges) = roles().get(role_name).cloned() else {
314+
continue;
315+
};
316+
317+
for privilege in privileges {
318+
group_perms.extend(RoleBuilder::from(&privilege).build());
319+
}
320+
}
321+
}
322+
323+
group_perms
324+
}
320325
// Map of [user group ID --> UserGroup]
321326
// This map is populated at startup with the list of user groups from parseable.json file
322327
#[derive(Debug, Default, Clone, derive_more::Deref, derive_more::DerefMut)]

src/rbac/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,10 @@ impl Users {
119119
pub fn get_permissions(&self, session: &SessionKey) -> Vec<Permission> {
120120
let mut permissions = sessions().get(session).cloned().unwrap_or_default();
121121

122-
let username = self.get_username_from_session(session).unwrap();
122+
let Some(username) = self.get_username_from_session(session) else {
123+
return permissions.into_iter().collect_vec();
124+
};
125+
123126
let user_groups = self.get_user_groups(&username);
124127
for group in user_groups {
125128
if let Some(group) = read_user_groups().get(&group) {

0 commit comments

Comments
 (0)