Skip to content

Commit

Permalink
feat: add tag filtering to external JWT authentication (#4425)
Browse files Browse the repository at this point in the history
* feat: tag filtering jwt ext auth

* move tags to scopes

* fix symlink

* update ee ref
  • Loading branch information
HugoCasa committed Sep 25, 2024
1 parent 8a277a0 commit 590321f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e093d51a219ce4ff0562a02db24ec402554a1f05
3d37b6c31155265d8d026ae9d6ced0b433078f87
2 changes: 1 addition & 1 deletion backend/windmill-api/src/concurrency_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn get_concurrent_intervals(
Query(iq): Query<ExtendedJobsParams>,
Query(lq): Query<ListCompletedQuery>,
) -> JsonResult<ExtendedJobs> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

if lq.success.is_some() && lq.running.is_some_and(|x| x) {
return Err(error::Error::BadRequest(
Expand Down
37 changes: 28 additions & 9 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::add_webhook_allowed_origin;
use crate::concurrency_groups::join_concurrency_key;
use crate::db::ApiAuthed;

use crate::users::get_scope_tags;
use crate::utils::content_plain;
use crate::{
db::DB,
Expand Down Expand Up @@ -1248,13 +1249,18 @@ pub fn list_queue_jobs_query(
lq: &ListQueueQuery,
fields: &[&str],
join_outstanding_wait_times: bool,
tags: Option<Vec<&str>>,
) -> SqlBuilder {
let sqlb = SqlBuilder::select_from("queue")
let mut sqlb = SqlBuilder::select_from("queue")
.fields(fields)
.order_by("created_at", lq.order_desc.unwrap_or(true))
.limit(1000)
.clone();

if let Some(tags) = tags {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

filter_list_queue_query(sqlb, lq, w_id, join_outstanding_wait_times)
}

Expand Down Expand Up @@ -1310,6 +1316,7 @@ async fn list_queue_jobs(
"workspace_id",
],
false,
get_scope_tags(&authed),
)
.sql()?;
let mut tx = user_db.begin(&authed).await?;
Expand Down Expand Up @@ -1498,6 +1505,10 @@ async fn list_filtered_uuids(

sqlb.and_where_is_null("schedule_path");

if let Some(tags) = get_scope_tags(&authed) {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

sqlb = filter_list_queue_query(sqlb, &lq, w_id.as_str(), false);

let sql = sqlb.query()?;
Expand Down Expand Up @@ -1557,7 +1568,7 @@ async fn list_jobs(
Query(lq): Query<ListCompletedQuery>,
Extension(_api_list_jobs_query_duration): Extension<Option<Histo>>,
) -> error::JsonResult<Vec<Job>> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

let (per_page, offset) = paginate(pagination);
let lqc = lq.clone();
Expand All @@ -1575,6 +1586,7 @@ async fn list_jobs(
&ListCompletedQuery { order_desc: Some(true), ..lqc },
UnifiedJob::completed_job_fields(),
true,
get_scope_tags(&authed),
))
} else {
None
Expand All @@ -1590,6 +1602,7 @@ async fn list_jobs(
&ListQueueQuery { order_desc: Some(true), ..lq.into() },
UnifiedJob::queued_job_fields(),
true,
get_scope_tags(&authed),
);

if let Some(sqlc) = sqlc {
Expand Down Expand Up @@ -1640,7 +1653,7 @@ pub async fn resume_suspended_flow_as_owner(
Path((_w_id, flow_id)): Path<(String, Uuid)>,
QueryOrBody(value): QueryOrBody<serde_json::Value>,
) -> error::Result<StatusCode> {
check_scopes(&authed, || format!("resumeflow"))?;
check_scopes(&authed, || format!("jobs:resumeflow"))?;
let value = value.unwrap_or(serde_json::Value::Null);
let mut tx = db.begin().await?;

Expand Down Expand Up @@ -3847,7 +3860,7 @@ async fn run_preview_script(
#[cfg(feature = "enterprise")]
check_license_key_valid().await?;

check_scopes(&authed, || format!("runscript"))?;
check_scopes(&authed, || format!("jobs:runscript"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -3917,7 +3930,7 @@ async fn run_bundle_preview_script(

check_license_key_valid().await?;

check_scopes(&authed, || format!("runscript"))?;
check_scopes(&authed, || format!("jobs:runscript"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -4403,7 +4416,7 @@ async fn run_preview_flow_job(
Query(run_query): Query<RunJobQuery>,
Json(raw_flow): Json<PreviewFlow>,
) -> error::Result<(StatusCode, String)> {
check_scopes(&authed, || format!("runflow"))?;
check_scopes(&authed, || format!("jobs:runflow"))?;
if authed.is_operator {
return Err(error::Error::NotAuthorized(
"Operators cannot run preview jobs for security reasons".to_string(),
Expand Down Expand Up @@ -4843,14 +4856,19 @@ pub fn list_completed_jobs_query(
lq: &ListCompletedQuery,
fields: &[&str],
join_outstanding_wait_times: bool,
tags: Option<Vec<&str>>,
) -> SqlBuilder {
let sqlb = SqlBuilder::select_from("completed_job")
let mut sqlb = SqlBuilder::select_from("completed_job")
.fields(fields)
.order_by("created_at", lq.order_desc.unwrap_or(true))
.offset(offset)
.limit(per_page)
.clone();

if let Some(tags) = tags {
sqlb.and_where_in("tag", &tags.iter().map(|x| quote(x)).collect::<Vec<_>>());
}

filter_list_completed_query(sqlb, lq, w_id, join_outstanding_wait_times)
}
#[derive(Deserialize, Clone)]
Expand Down Expand Up @@ -4895,7 +4913,7 @@ async fn list_completed_jobs(
Query(pagination): Query<Pagination>,
Query(lq): Query<ListCompletedQuery>,
) -> error::JsonResult<Vec<ListableCompletedJob>> {
check_scopes(&authed, || format!("listjobs"))?;
check_scopes(&authed, || format!("jobs:listjobs"))?;

let (per_page, offset) = paginate(pagination);

Expand Down Expand Up @@ -4937,6 +4955,7 @@ async fn list_completed_jobs(
"'CompletedJob' as type",
],
false,
get_scope_tags(&authed),
)
.sql()?;
let mut tx = user_db.begin(&authed).await?;
Expand Down Expand Up @@ -5161,7 +5180,7 @@ async fn delete_completed_job<'a>(
Extension(user_db): Extension<UserDB>,
Path((w_id, id)): Path<(String, Uuid)>,
) -> error::Result<Response> {
check_scopes(&authed, || format!("deletejob"))?;
check_scopes(&authed, || format!("jobs:deletejob"))?;

let mut tx = user_db.begin(&authed).await?;

Expand Down
28 changes: 23 additions & 5 deletions backend/windmill-api/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,12 @@ where
{
if let Some(authed) = cache.get_authed(workspace_id.clone(), &token).await {
parts.extensions.insert(authed.clone());
if authed.scopes.is_some()
&& (path_vec.len() < 3
|| (path_vec[4] != "jobs" && path_vec[4] != "jobs_u"))
if authed.scopes.as_ref().is_some_and(|scopes| {
scopes
.iter()
.any(|s| s.starts_with("jobs:") || s.starts_with("run:"))
}) && (path_vec.len() < 3
|| (path_vec[4] != "jobs" && path_vec[4] != "jobs_u"))
{
BRUTE_FORCE_COUNTER.increment().await;
return Err((
Expand Down Expand Up @@ -681,15 +684,30 @@ pub fn check_scopes<F>(authed: &ApiAuthed, required: F) -> error::Result<()>
where
F: FnOnce() -> String,
{
if let Some(scopes) = &authed.scopes {
if authed.scopes.as_ref().is_some_and(|scopes| {
scopes
.iter()
.any(|s| s.starts_with("jobs:") || s.starts_with("run:"))
}) {
let req = &required();
if !scopes.contains(req) {
if !authed.scopes.as_ref().unwrap().contains(req) {
return Err(Error::BadRequest(format!("missing required scope: {req}")));
}
}
Ok(())
}

pub fn get_scope_tags(authed: &ApiAuthed) -> Option<Vec<&str>> {
authed
.scopes
.as_ref()?
.iter()
.find_map(|s| match s.split(":").collect::<Vec<_>>().as_slice() {
["if_jobs", "filter_tags", tags] => Some(tags.split(",").collect::<Vec<_>>()),
_ => None,
})
}

#[derive(Clone, Debug)]
pub struct OptAuthed(pub Option<ApiAuthed>);

Expand Down
1 change: 1 addition & 0 deletions backend/windmill-common/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct JWTAuthClaims {
pub workspace_id: String,
pub exp: usize,
pub job_id: Option<String>,
pub scopes: Option<Vec<String>>,
}

#[derive(Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub async fn create_token_for_owner(
exp: (chrono::Utc::now() + chrono::Duration::seconds(expires_in as i64)).timestamp()
as usize,
job_id: Some(job_id.to_string()),
scopes: None,
};

let token = jsonwebtoken::encode(
Expand Down

0 comments on commit 590321f

Please sign in to comment.