From a4856327cf64d93792d423190475802d424c852b Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Sun, 10 Sep 2023 11:58:50 +0100 Subject: [PATCH] Fix authentication issues with `LIVE SELECT` statements (#2661) --- lib/src/doc/lives.rs | 140 +++++++++++++++--- lib/src/iam/signin.rs | 8 + lib/src/iam/signup.rs | 2 + lib/src/kvs/tests/cluster_init.rs | 4 + lib/src/sql/statements/live.rs | 64 ++++---- lib/src/sql/value/serde/ser/statement/live.rs | 5 + 6 files changed, 174 insertions(+), 49 deletions(-) diff --git a/lib/src/doc/lives.rs b/lib/src/doc/lives.rs index 8c8682e7..5f104b07 100644 --- a/lib/src/doc/lives.rs +++ b/lib/src/doc/lives.rs @@ -3,15 +3,21 @@ use crate::dbs::Notification; use crate::dbs::Options; use crate::dbs::Statement; use crate::dbs::{Action, Transaction}; +use crate::doc::CursorDoc; use crate::doc::Document; use crate::err::Error; +use crate::sql::paths::SC; +use crate::sql::paths::SD; +use crate::sql::paths::TK; +use crate::sql::permission::Permission; use crate::sql::Value; +use std::ops::Deref; use std::sync::Arc; impl<'a> Document<'a> { pub async fn lives( &self, - ctx: &Context<'_>, + _ctx: &Context<'_>, opt: &Options, txn: &Transaction, stm: &Statement<'_>, @@ -30,26 +36,66 @@ impl<'a> Document<'a> { for lv in self.lv(opt, txn).await?.iter() { // Create a new statement let lq = Statement::from(lv); - // Check LIVE SELECT where condition - if let Some(cond) = lq.conds() { - // Check if this is a delete statement - let doc = match stm.is_delete() { - true => &self.initial, - false => &self.current, - }; - // Check if the expression is truthy - if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() { - continue; - } + // Get the event action + let met = if stm.is_delete() { + Value::from("DELETE") + } else if self.is_new() { + Value::from("CREATE") + } else { + Value::from("UPDATE") + }; + // Check if this is a delete statement + let doc = match stm.is_delete() { + true => &self.initial, + false => &self.current, + }; + // Ensure that a session exists on the LIVE query + let sess = lv.session.as_ref().ok_or(Error::UnknownAuth)?; + // Ensure that auth info exists on the LIVE query + let auth = lv.auth.clone().ok_or(Error::UnknownAuth)?; + // We need to create a new context which we will + // use for processing this LIVE query statement. + // This ensures that we are using the session + // of the user who created the LIVE query. + let mut lqctx = Context::background(); + lqctx.add_value("auth", sess.pick(SD.as_ref())); + lqctx.add_value("scope", sess.pick(SC.as_ref())); + lqctx.add_value("token", sess.pick(TK.as_ref())); + lqctx.add_value("session", sess); + // We need to create a new options which we will + // use for processing this LIVE query statement. + // This ensures that we are using the auth data + // of the user who created the LIVE query. + let lqopt = Options::new_with_perms(opt, true) + .with_auth_enabled(true) + .with_auth(Arc::from(auth)); + // Add $before, $after, $value, and $event params + // to this LIVE query so that user can use these + // within field projections and WHERE clauses. + lqctx.add_value("event", met); + lqctx.add_value("value", self.current.doc.deref()); + lqctx.add_value("after", self.current.doc.deref()); + lqctx.add_value("before", self.initial.doc.deref()); + // First of all, let's check to see if the WHERE + // clause of the LIVE query is matched by this + // document. If it is then we can continue. + match self.lq_check(&lqctx, &lqopt, txn, &lq, doc).await { + Err(Error::Ignore) => continue, + Err(e) => return Err(e), + Ok(_) => (), } - // Check authorization - trace!("Checking live query auth: {:?}", lv); - let lq_options = Options::new_with_perms(opt, true) - .with_auth(Arc::from(lv.auth.clone().ok_or(Error::UnknownAuth)?)); - if self.allow(ctx, &lq_options, txn, &lq).await.is_err() { - continue; + // Secondly, let's check to see if any PERMISSIONS + // clause for this table allows this document to + // be viewed by the user who created this LIVE + // query. If it does, then we can continue. + match self.lq_allow(&lqctx, &lqopt, txn, &lq, doc).await { + Err(Error::Ignore) => continue, + Err(e) => return Err(e), + Ok(_) => (), } - // Check what type of data change this is + // Finally, let's check what type of statement + // caused this LIVE query to run, and send the + // relevant notification based on the statement. if stm.is_delete() { // Send a DELETE notification if opt.id()? == lv.node.0 { @@ -69,7 +115,7 @@ impl<'a> Document<'a> { chn.send(Notification { id: lv.id.clone(), action: Action::Create, - result: self.pluck(ctx, opt, txn, &lq).await?, + result: self.pluck(&lqctx, &lqopt, txn, &lq).await?, }) .await?; } else { @@ -81,7 +127,7 @@ impl<'a> Document<'a> { chn.send(Notification { id: lv.id.clone(), action: Action::Update, - result: self.pluck(ctx, opt, txn, &lq).await?, + result: self.pluck(&lqctx, &lqopt, txn, &lq).await?, }) .await?; } else { @@ -93,4 +139,56 @@ impl<'a> Document<'a> { // Carry on Ok(()) } + /// Check the WHERE clause for a LIVE query + async fn lq_check( + &self, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + stm: &Statement<'_>, + doc: &CursorDoc<'_>, + ) -> Result<(), Error> { + // Check where condition + if let Some(cond) = stm.conds() { + // Check if the expression is truthy + if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() { + // Ignore this document + return Err(Error::Ignore); + } + } + // Carry on + Ok(()) + } + /// Check any PERRMISSIONS for a LIVE query + async fn lq_allow( + &self, + ctx: &Context<'_>, + opt: &Options, + txn: &Transaction, + stm: &Statement<'_>, + doc: &CursorDoc<'_>, + ) -> Result<(), Error> { + // Should we run permissions checks? + if opt.check_perms(stm.into()) { + // Get the table + let tb = self.tb(opt, txn).await?; + // Get the permission clause + let perms = &tb.permissions.select; + // Process the table permissions + match perms { + Permission::None => return Err(Error::Ignore), + Permission::Full => return Ok(()), + Permission::Specific(e) => { + // Disable permissions + let opt = &opt.new_with_perms(false); + // Process the PERMISSION clause + if !e.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() { + return Err(Error::Ignore); + } + } + } + } + // Carry on + Ok(()) + } } diff --git a/lib/src/iam/signin.rs b/lib/src/iam/signin.rs index a8fa85ed..2d88e193 100644 --- a/lib/src/iam/signin.rs +++ b/lib/src/iam/signin.rs @@ -145,6 +145,8 @@ pub async fn sc( id: Some(rid.to_raw()), ..Claims::default() }; + // Log the authenticated scope info + trace!("Signing in to scope `{}`", sc); // Create the authentication token let enc = encode(&HEADER, &val, &key); // Set the authentication on the session @@ -201,6 +203,8 @@ pub async fn db( id: Some(user), ..Claims::default() }; + // Log the authenticated database info + trace!("Signing in to database `{}`", db); // Create the authentication token let enc = encode(&HEADER, &val, &key); // Set the authentication on the session @@ -241,6 +245,8 @@ pub async fn ns( id: Some(user), ..Claims::default() }; + // Log the authenticated namespace info + trace!("Signing in to namespace `{}`", ns); // Create the authentication token let enc = encode(&HEADER, &val, &key); // Set the authentication on the session @@ -278,6 +284,8 @@ pub async fn kv( id: Some(user), ..Claims::default() }; + // Log the authenticated root info + trace!("Signing in as root"); // Create the authentication token let enc = encode(&HEADER, &val, &key); // Set the authentication on the session diff --git a/lib/src/iam/signup.rs b/lib/src/iam/signup.rs index 1668ec67..f0072be0 100644 --- a/lib/src/iam/signup.rs +++ b/lib/src/iam/signup.rs @@ -86,6 +86,8 @@ pub async fn sc( id: Some(rid.to_raw()), ..Claims::default() }; + // Log the authenticated scope info + trace!("Signing up to scope `{}`", sc); // Create the authentication token let enc = encode(&HEADER, &val, &key); // Set the authentication on the session diff --git a/lib/src/kvs/tests/cluster_init.rs b/lib/src/kvs/tests/cluster_init.rs index 5d11ef94..a03bacce 100644 --- a/lib/src/kvs/tests/cluster_init.rs +++ b/lib/src/kvs/tests/cluster_init.rs @@ -74,6 +74,7 @@ async fn expired_nodes_get_live_queries_archived() { cond: None, fetch: None, archived: Some(crate::sql::uuid::Uuid::from(old_node)), + session: Some(Value::None), auth: Some(Auth::for_root(Role::Owner)), }; let ctx = context::Context::background(); @@ -150,6 +151,7 @@ async fn single_live_queries_are_garbage_collected() { cond: None, fetch: None, archived: None, + session: Some(Value::None), auth: Some(Auth::for_root(Role::Owner)), }; live_st @@ -166,6 +168,7 @@ async fn single_live_queries_are_garbage_collected() { cond: None, fetch: None, archived: None, + session: Some(Value::None), auth: Some(Auth::for_root(Role::Owner)), }; live_st @@ -230,6 +233,7 @@ async fn bootstrap_does_not_error_on_missing_live_queries() { cond: None, fetch: None, archived: None, + session: Some(Value::None), auth: Some(Auth::for_root(Role::Owner)), }; live_st diff --git a/lib/src/sql/statements/live.rs b/lib/src/sql/statements/live.rs index aefee1f4..d7590546 100644 --- a/lib/src/sql/statements/live.rs +++ b/lib/src/sql/statements/live.rs @@ -27,7 +27,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] -#[revisioned(revision = 1)] +#[revisioned(revision = 2)] pub struct LiveStatement { pub id: Uuid, pub node: Uuid, @@ -35,15 +35,24 @@ pub struct LiveStatement { pub what: Value, pub cond: Option, pub fetch: Option, - - // Non-query properties that are necessary for storage or otherwise carrying information - - // When a live query is archived, this should be the node ID that archived the query. - pub archived: Option, - // A live query is run with permissions, and we must validate that during the run. - // It is optional, because the live query may be constructed without it being set. - // It is populated during compute. - pub auth: Option, + // When a live query is marked for archiving, this will + // be set to the node ID that archived the query. This + // is an internal property, set by the database runtime. + // This is optional, and os only set when archived. + pub(crate) archived: Option, + // When a live query is created, we must also store the + // authenticated session of the user who made the query, + // so we can chack it later when sending notifications. + // This is optional as it is only set by the database + // runtime when storing the live query to storage. + #[revision(start = 2)] + pub(crate) session: Option, + // When a live query is created, we must also store the + // authenticated session of the user who made the query, + // so we can chack it later when sending notifications. + // This is optional as it is only set by the database + // runtime when storing the live query to storage. + pub(crate) auth: Option, } impl LiveStatement { @@ -59,33 +68,33 @@ impl LiveStatement { opt.realtime()?; // Valid options? opt.valid_for_db()?; + // Get the Node ID + let nid = opt.id()?; // Check that auth has been set - let self_override = LiveStatement { - auth: match self.auth { - Some(ref auth) => Some(auth.clone()), - None => Some(opt.auth.as_ref().clone()), - }, + let mut stm = LiveStatement { + // Use the current session authentication + // for when we store the LIVE Statement + session: ctx.value("session").cloned(), + // Use the current session authentication + // for when we store the LIVE Statement + auth: Some(opt.auth.as_ref().clone()), + // Clone the rest of the original fields + // from the LIVE statement to the new one ..self.clone() }; - trace!("Evaluated live query auth to {:?}", self_override.auth); + let id = stm.id.0; // Claim transaction let mut run = txn.lock().await; // Process the live query table - match self_override.what.compute(ctx, opt, txn, doc).await? { + match stm.what.compute(ctx, opt, txn, doc).await? { Value::Table(tb) => { - // Clone the current statement - let mut stm = self_override.clone(); // Store the current Node ID - if let Err(e) = opt.id() { - trace!("No ID for live query {:?}, error={:?}", stm, e) - } - stm.node = Uuid(opt.id()?); + stm.node = nid.into(); // Insert the node live query - let key = - crate::key::node::lq::new(opt.id()?, self_override.id.0, opt.ns(), opt.db()); + let key = crate::key::node::lq::new(opt.id()?, id, opt.ns(), opt.db()); run.putc(key, tb.as_str(), None).await?; // Insert the table live query - let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, self_override.id.0); + let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, id); run.putc(key, stm, None).await?; } v => { @@ -95,8 +104,7 @@ impl LiveStatement { } }; // Return the query id - trace!("Live query after processing: {:?}", self_override); - Ok(self_override.id.clone().into()) + Ok(id.into()) } pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement { diff --git a/lib/src/sql/value/serde/ser/statement/live.rs b/lib/src/sql/value/serde/ser/statement/live.rs index 6d77d1d6..d7c17029 100644 --- a/lib/src/sql/value/serde/ser/statement/live.rs +++ b/lib/src/sql/value/serde/ser/statement/live.rs @@ -47,6 +47,7 @@ pub struct SerializeLiveStatement { cond: Option, fetch: Option, archived: Option, + session: Option, auth: Option, } @@ -80,6 +81,9 @@ impl serde::ser::SerializeStruct for SerializeLiveStatement { "archived" => { self.archived = value.serialize(ser::uuid::opt::Serializer.wrap())?.map(Uuid); } + "session" => { + self.session = None; + } "auth" => { self.auth = None; } @@ -99,6 +103,7 @@ impl serde::ser::SerializeStruct for SerializeLiveStatement { cond: self.cond, fetch: self.fetch, archived: self.archived, + session: None, auth: None, }) }