Skip to content

Commit c4533be

Browse files
feat: Introduce User Groups to Parseable (#1366)
- Added support for user groups - Migrated `PUT /user/{username}/role` to `PATCH /user/{username}/role/add` and `PATCH /user/{username}/role/remove` - Introduce resource types (stream, llm, all) for privileges - roles don't need any migration - auth flow modified to account for resource type - update: migrate from v4 to v6 - user and role deletion while still being used - renamed resource types - user sessions get removed upon modifying group's roles - session refresh in case of role modification --------- Co-authored-by: Nikhil Sinha <[email protected]>
1 parent d4a22e9 commit c4533be

File tree

21 files changed

+1071
-239
lines changed

21 files changed

+1071
-239
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,29 @@ pub async fn sync_streams_with_ingestors(
160160
pub async fn sync_users_with_roles_with_ingestors(
161161
username: &str,
162162
role: &HashSet<String>,
163+
operation: &str,
163164
) -> Result<(), RBACError> {
165+
match operation {
166+
"add" | "remove" => {}
167+
_ => return Err(RBACError::InvalidSyncOperation(operation.to_string())),
168+
}
169+
164170
let role_data = to_vec(&role.clone()).map_err(|err| {
165171
error!("Fatal: failed to serialize role: {:?}", err);
166172
RBACError::SerdeError(err)
167173
})?;
168174

169175
let username = username.to_owned();
170176

177+
let op = operation.to_string();
178+
171179
for_each_live_ingestor(move |ingestor| {
172180
let url = format!(
173-
"{}{}/user/{}/role/sync",
181+
"{}{}/user/{}/role/sync/{}",
174182
ingestor.domain_name,
175183
base_path_without_preceding_slash(),
176-
username
184+
username,
185+
op
177186
);
178187

179188
let role_data = role_data.clone();

src/handlers/http/middleware.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct CommonAttributes {
5959

6060
pub trait RouteExt {
6161
fn authorize(self, action: Action) -> Self;
62-
fn authorize_for_stream(self, action: Action) -> Self;
62+
fn authorize_for_resource(self, action: Action) -> Self;
6363
fn authorize_for_user(self, action: Action) -> Self;
6464
}
6565

@@ -71,10 +71,10 @@ impl RouteExt for Route {
7171
})
7272
}
7373

74-
fn authorize_for_stream(self, action: Action) -> Self {
74+
fn authorize_for_resource(self, action: Action) -> Self {
7575
self.wrap(Auth {
7676
action,
77-
method: auth_stream_context,
77+
method: auth_resource_context,
7878
})
7979
}
8080

@@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result<rbac:
182182
creds.map(|key| Users.authorize(key, action, None, None))
183183
}
184184

185-
pub fn auth_stream_context(
185+
pub fn auth_resource_context(
186186
req: &mut ServiceRequest,
187187
action: Action,
188188
) -> Result<rbac::Response, Error> {
189189
let creds = extract_session_key(req);
190+
let usergroup = req.match_info().get("usergroup");
191+
let llmid = req.match_info().get("llmid");
190192
let mut stream = req.match_info().get("logstream");
191-
if stream.is_none() {
193+
if let Some(usergroup) = usergroup {
194+
creds.map(|key| Users.authorize(key, action, Some(usergroup), None))
195+
} else if let Some(llmid) = llmid {
196+
creds.map(|key| Users.authorize(key, action, Some(llmid), None))
197+
} else if let Some(stream) = stream {
198+
creds.map(|key| Users.authorize(key, action, Some(stream), None))
199+
} else {
192200
if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) {
193201
stream = Some(stream_name.to_str().unwrap());
194202
}
203+
creds.map(|key| Users.authorize(key, action, stream, None))
195204
}
196-
creds.map(|key| Users.authorize(key, action, stream, None))
197205
}
198206

199207
pub fn auth_user_context(

src/handlers/http/modal/ingest/ingestor_rbac.rs

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio::sync::Mutex;
2424
use crate::{
2525
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
2626
rbac::{
27+
map::roles,
2728
user::{self, User as ParseableUser},
2829
Users,
2930
},
@@ -48,7 +49,7 @@ pub async fn post_user(
4849
let _ = storage::put_staging_metadata(&metadata);
4950
let created_role = user.roles.clone();
5051
Users.put_user(user.clone());
51-
Users.put_role(&username, created_role.clone());
52+
Users.add_roles(&username, created_role.clone());
5253
}
5354

5455
Ok(generated_password)
@@ -73,34 +74,103 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
7374
Ok(format!("deleted user: {username}"))
7475
}
7576

76-
// Handler PUT /user/{username}/roles => Put roles for user
77-
// Put roles for given user
78-
pub async fn put_role(
77+
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
78+
pub async fn add_roles_to_user(
7979
username: web::Path<String>,
80-
role: web::Json<HashSet<String>>,
80+
roles_to_add: web::Json<HashSet<String>>,
8181
) -> Result<String, RBACError> {
8282
let username = username.into_inner();
83-
let role = role.into_inner();
83+
let roles_to_add = roles_to_add.into_inner();
8484

8585
if !Users.contains(&username) {
8686
return Err(RBACError::UserDoesNotExist);
8787
};
88+
89+
// check if all roles exist
90+
let mut non_existent_roles = Vec::new();
91+
roles_to_add.iter().for_each(|r| {
92+
if roles().get(r).is_none() {
93+
non_existent_roles.push(r.clone());
94+
}
95+
});
96+
97+
if !non_existent_roles.is_empty() {
98+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
99+
}
100+
101+
// update parseable.json first
102+
let mut metadata = get_metadata().await?;
103+
if let Some(user) = metadata
104+
.users
105+
.iter_mut()
106+
.find(|user| user.username() == username)
107+
{
108+
user.roles.extend(roles_to_add.clone());
109+
} else {
110+
// should be unreachable given state is always consistent
111+
return Err(RBACError::UserDoesNotExist);
112+
}
113+
114+
let _ = storage::put_staging_metadata(&metadata);
115+
// update in mem table
116+
Users.add_roles(&username.clone(), roles_to_add.clone());
117+
118+
Ok(format!("Roles updated successfully for {username}"))
119+
}
120+
121+
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
122+
pub async fn remove_roles_from_user(
123+
username: web::Path<String>,
124+
roles_to_remove: web::Json<HashSet<String>>,
125+
) -> Result<String, RBACError> {
126+
let username = username.into_inner();
127+
let roles_to_remove = roles_to_remove.into_inner();
128+
129+
if !Users.contains(&username) {
130+
return Err(RBACError::UserDoesNotExist);
131+
};
132+
133+
// check if all roles exist
134+
let mut non_existent_roles = Vec::new();
135+
roles_to_remove.iter().for_each(|r| {
136+
if roles().get(r).is_none() {
137+
non_existent_roles.push(r.clone());
138+
}
139+
});
140+
141+
if !non_existent_roles.is_empty() {
142+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
143+
}
144+
145+
// check that user actually has these roles
146+
let user_roles: HashSet<String> = HashSet::from_iter(Users.get_role(&username));
147+
let roles_not_with_user: HashSet<String> =
148+
HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned());
149+
150+
if !roles_not_with_user.is_empty() {
151+
return Err(RBACError::RolesNotAssigned(Vec::from_iter(
152+
roles_not_with_user,
153+
)));
154+
}
155+
88156
// update parseable.json first
89157
let mut metadata = get_metadata().await?;
90158
if let Some(user) = metadata
91159
.users
92160
.iter_mut()
93161
.find(|user| user.username() == username)
94162
{
95-
user.roles.clone_from(&role);
163+
let diff: HashSet<String> =
164+
HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned());
165+
user.roles = diff;
96166
} else {
97167
// should be unreachable given state is always consistent
98168
return Err(RBACError::UserDoesNotExist);
99169
}
100170

101171
let _ = storage::put_staging_metadata(&metadata);
102172
// update in mem table
103-
Users.put_role(&username.clone(), role.clone());
173+
Users.remove_roles(&username.clone(), roles_to_remove.clone());
104174

105175
Ok(format!("Roles updated successfully for {username}"))
106176
}

src/handlers/http/modal/ingest/ingestor_role.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
20+
1921
use actix_web::{
2022
web::{self, Json},
2123
HttpResponse, Responder,
2224
};
2325

2426
use crate::{
2527
handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError},
26-
rbac::{map::mut_roles, role::model::DefaultPrivilege},
28+
rbac::{
29+
map::{mut_roles, mut_sessions, read_user_groups, users},
30+
role::model::DefaultPrivilege,
31+
},
2732
storage,
2833
};
2934

@@ -40,5 +45,25 @@ pub async fn put(
4045
let _ = storage::put_staging_metadata(&metadata);
4146
mut_roles().insert(name.clone(), privileges);
4247

48+
// refresh the sessions of all users using this role
49+
// for this, iterate over all user_groups and users and create a hashset of users
50+
let mut session_refresh_users: HashSet<String> = HashSet::new();
51+
for user_group in read_user_groups().values().cloned() {
52+
if user_group.roles.contains(&name) {
53+
session_refresh_users.extend(user_group.users);
54+
}
55+
}
56+
57+
// iterate over all users to see if they have this role
58+
for user in users().values().cloned() {
59+
if user.roles.contains(&name) {
60+
session_refresh_users.insert(user.username().to_string());
61+
}
62+
}
63+
64+
for username in session_refresh_users {
65+
mut_sessions().remove_user(&username);
66+
}
67+
4368
Ok(HttpResponse::Ok().finish())
4469
}

src/handlers/http/modal/ingest_server.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,21 @@ impl IngestServer {
198198
.wrap(DisAllowRootUser),
199199
)
200200
.service(
201-
web::resource("/{username}/role/sync")
202-
// PUT /user/{username}/roles => Put roles for user
201+
web::resource("/{username}/role/sync/add")
202+
// PATCH /user/{username}/role/sync/add => Add roles to a user
203203
.route(
204-
web::put()
205-
.to(ingestor_rbac::put_role)
204+
web::patch()
205+
.to(ingestor_rbac::add_roles_to_user)
206+
.authorize(Action::PutUserRoles)
207+
.wrap(DisAllowRootUser),
208+
),
209+
)
210+
.service(
211+
web::resource("/{username}/role/sync/remove")
212+
// PATCH /user/{username}/role/sync/remove => Remove roles from a user
213+
.route(
214+
web::patch()
215+
.to(ingestor_rbac::remove_roles_from_user)
206216
.authorize(Action::PutUserRoles)
207217
.wrap(DisAllowRootUser),
208218
),
@@ -227,7 +237,7 @@ impl IngestServer {
227237
.route(
228238
web::post()
229239
.to(ingest::post_event)
230-
.authorize_for_stream(Action::Ingest),
240+
.authorize_for_resource(Action::Ingest),
231241
)
232242
.wrap(from_fn(
233243
resource_check::check_resource_utilization_middleware,
@@ -245,31 +255,31 @@ impl IngestServer {
245255
.route(
246256
web::put()
247257
.to(ingestor_logstream::put_stream)
248-
.authorize_for_stream(Action::CreateStream),
258+
.authorize_for_resource(Action::CreateStream),
249259
),
250260
)
251261
.service(
252262
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
253263
web::resource("/info").route(
254264
web::get()
255265
.to(logstream::get_stream_info)
256-
.authorize_for_stream(Action::GetStreamInfo),
266+
.authorize_for_resource(Action::GetStreamInfo),
257267
),
258268
)
259269
.service(
260270
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
261271
web::resource("/stats").route(
262272
web::get()
263273
.to(logstream::get_stats)
264-
.authorize_for_stream(Action::GetStats),
274+
.authorize_for_resource(Action::GetStats),
265275
),
266276
)
267277
.service(
268278
web::scope("/retention").service(
269279
web::resource("/cleanup").route(
270280
web::post()
271281
.to(ingestor_logstream::retention_cleanup)
272-
.authorize_for_stream(Action::PutRetention),
282+
.authorize_for_resource(Action::PutRetention),
273283
),
274284
),
275285
),

0 commit comments

Comments
 (0)