Fix authentication issues with LIVE SELECT
statements (#2661)
This commit is contained in:
parent
9f33d5dc27
commit
a4856327cf
6 changed files with 174 additions and 49 deletions
|
@ -3,15 +3,21 @@ use crate::dbs::Notification;
|
||||||
use crate::dbs::Options;
|
use crate::dbs::Options;
|
||||||
use crate::dbs::Statement;
|
use crate::dbs::Statement;
|
||||||
use crate::dbs::{Action, Transaction};
|
use crate::dbs::{Action, Transaction};
|
||||||
|
use crate::doc::CursorDoc;
|
||||||
use crate::doc::Document;
|
use crate::doc::Document;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
|
use crate::sql::paths::SC;
|
||||||
|
use crate::sql::paths::SD;
|
||||||
|
use crate::sql::paths::TK;
|
||||||
|
use crate::sql::permission::Permission;
|
||||||
use crate::sql::Value;
|
use crate::sql::Value;
|
||||||
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
impl<'a> Document<'a> {
|
impl<'a> Document<'a> {
|
||||||
pub async fn lives(
|
pub async fn lives(
|
||||||
&self,
|
&self,
|
||||||
ctx: &Context<'_>,
|
_ctx: &Context<'_>,
|
||||||
opt: &Options,
|
opt: &Options,
|
||||||
txn: &Transaction,
|
txn: &Transaction,
|
||||||
stm: &Statement<'_>,
|
stm: &Statement<'_>,
|
||||||
|
@ -30,26 +36,66 @@ impl<'a> Document<'a> {
|
||||||
for lv in self.lv(opt, txn).await?.iter() {
|
for lv in self.lv(opt, txn).await?.iter() {
|
||||||
// Create a new statement
|
// Create a new statement
|
||||||
let lq = Statement::from(lv);
|
let lq = Statement::from(lv);
|
||||||
// Check LIVE SELECT where condition
|
// Get the event action
|
||||||
if let Some(cond) = lq.conds() {
|
let met = if stm.is_delete() {
|
||||||
// Check if this is a delete statement
|
Value::from("DELETE")
|
||||||
let doc = match stm.is_delete() {
|
} else if self.is_new() {
|
||||||
true => &self.initial,
|
Value::from("CREATE")
|
||||||
false => &self.current,
|
} else {
|
||||||
};
|
Value::from("UPDATE")
|
||||||
// Check if the expression is truthy
|
};
|
||||||
if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() {
|
// Check if this is a delete statement
|
||||||
continue;
|
let doc = match stm.is_delete() {
|
||||||
}
|
true => &self.initial,
|
||||||
|
false => &self.current,
|
||||||
|
};
|
||||||
|
// Ensure that a session exists on the LIVE query
|
||||||
|
let sess = lv.session.as_ref().ok_or(Error::UnknownAuth)?;
|
||||||
|
// Ensure that auth info exists on the LIVE query
|
||||||
|
let auth = lv.auth.clone().ok_or(Error::UnknownAuth)?;
|
||||||
|
// We need to create a new context which we will
|
||||||
|
// use for processing this LIVE query statement.
|
||||||
|
// This ensures that we are using the session
|
||||||
|
// of the user who created the LIVE query.
|
||||||
|
let mut lqctx = Context::background();
|
||||||
|
lqctx.add_value("auth", sess.pick(SD.as_ref()));
|
||||||
|
lqctx.add_value("scope", sess.pick(SC.as_ref()));
|
||||||
|
lqctx.add_value("token", sess.pick(TK.as_ref()));
|
||||||
|
lqctx.add_value("session", sess);
|
||||||
|
// We need to create a new options which we will
|
||||||
|
// use for processing this LIVE query statement.
|
||||||
|
// This ensures that we are using the auth data
|
||||||
|
// of the user who created the LIVE query.
|
||||||
|
let lqopt = Options::new_with_perms(opt, true)
|
||||||
|
.with_auth_enabled(true)
|
||||||
|
.with_auth(Arc::from(auth));
|
||||||
|
// Add $before, $after, $value, and $event params
|
||||||
|
// to this LIVE query so that user can use these
|
||||||
|
// within field projections and WHERE clauses.
|
||||||
|
lqctx.add_value("event", met);
|
||||||
|
lqctx.add_value("value", self.current.doc.deref());
|
||||||
|
lqctx.add_value("after", self.current.doc.deref());
|
||||||
|
lqctx.add_value("before", self.initial.doc.deref());
|
||||||
|
// First of all, let's check to see if the WHERE
|
||||||
|
// clause of the LIVE query is matched by this
|
||||||
|
// document. If it is then we can continue.
|
||||||
|
match self.lq_check(&lqctx, &lqopt, txn, &lq, doc).await {
|
||||||
|
Err(Error::Ignore) => continue,
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
Ok(_) => (),
|
||||||
}
|
}
|
||||||
// Check authorization
|
// Secondly, let's check to see if any PERMISSIONS
|
||||||
trace!("Checking live query auth: {:?}", lv);
|
// clause for this table allows this document to
|
||||||
let lq_options = Options::new_with_perms(opt, true)
|
// be viewed by the user who created this LIVE
|
||||||
.with_auth(Arc::from(lv.auth.clone().ok_or(Error::UnknownAuth)?));
|
// query. If it does, then we can continue.
|
||||||
if self.allow(ctx, &lq_options, txn, &lq).await.is_err() {
|
match self.lq_allow(&lqctx, &lqopt, txn, &lq, doc).await {
|
||||||
continue;
|
Err(Error::Ignore) => continue,
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
Ok(_) => (),
|
||||||
}
|
}
|
||||||
// Check what type of data change this is
|
// Finally, let's check what type of statement
|
||||||
|
// caused this LIVE query to run, and send the
|
||||||
|
// relevant notification based on the statement.
|
||||||
if stm.is_delete() {
|
if stm.is_delete() {
|
||||||
// Send a DELETE notification
|
// Send a DELETE notification
|
||||||
if opt.id()? == lv.node.0 {
|
if opt.id()? == lv.node.0 {
|
||||||
|
@ -69,7 +115,7 @@ impl<'a> Document<'a> {
|
||||||
chn.send(Notification {
|
chn.send(Notification {
|
||||||
id: lv.id.clone(),
|
id: lv.id.clone(),
|
||||||
action: Action::Create,
|
action: Action::Create,
|
||||||
result: self.pluck(ctx, opt, txn, &lq).await?,
|
result: self.pluck(&lqctx, &lqopt, txn, &lq).await?,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
|
@ -81,7 +127,7 @@ impl<'a> Document<'a> {
|
||||||
chn.send(Notification {
|
chn.send(Notification {
|
||||||
id: lv.id.clone(),
|
id: lv.id.clone(),
|
||||||
action: Action::Update,
|
action: Action::Update,
|
||||||
result: self.pluck(ctx, opt, txn, &lq).await?,
|
result: self.pluck(&lqctx, &lqopt, txn, &lq).await?,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
|
@ -93,4 +139,56 @@ impl<'a> Document<'a> {
|
||||||
// Carry on
|
// Carry on
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
/// Check the WHERE clause for a LIVE query
|
||||||
|
async fn lq_check(
|
||||||
|
&self,
|
||||||
|
ctx: &Context<'_>,
|
||||||
|
opt: &Options,
|
||||||
|
txn: &Transaction,
|
||||||
|
stm: &Statement<'_>,
|
||||||
|
doc: &CursorDoc<'_>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// Check where condition
|
||||||
|
if let Some(cond) = stm.conds() {
|
||||||
|
// Check if the expression is truthy
|
||||||
|
if !cond.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() {
|
||||||
|
// Ignore this document
|
||||||
|
return Err(Error::Ignore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Carry on
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
/// Check any PERRMISSIONS for a LIVE query
|
||||||
|
async fn lq_allow(
|
||||||
|
&self,
|
||||||
|
ctx: &Context<'_>,
|
||||||
|
opt: &Options,
|
||||||
|
txn: &Transaction,
|
||||||
|
stm: &Statement<'_>,
|
||||||
|
doc: &CursorDoc<'_>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// Should we run permissions checks?
|
||||||
|
if opt.check_perms(stm.into()) {
|
||||||
|
// Get the table
|
||||||
|
let tb = self.tb(opt, txn).await?;
|
||||||
|
// Get the permission clause
|
||||||
|
let perms = &tb.permissions.select;
|
||||||
|
// Process the table permissions
|
||||||
|
match perms {
|
||||||
|
Permission::None => return Err(Error::Ignore),
|
||||||
|
Permission::Full => return Ok(()),
|
||||||
|
Permission::Specific(e) => {
|
||||||
|
// Disable permissions
|
||||||
|
let opt = &opt.new_with_perms(false);
|
||||||
|
// Process the PERMISSION clause
|
||||||
|
if !e.compute(ctx, opt, txn, Some(doc)).await?.is_truthy() {
|
||||||
|
return Err(Error::Ignore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Carry on
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,8 @@ pub async fn sc(
|
||||||
id: Some(rid.to_raw()),
|
id: Some(rid.to_raw()),
|
||||||
..Claims::default()
|
..Claims::default()
|
||||||
};
|
};
|
||||||
|
// Log the authenticated scope info
|
||||||
|
trace!("Signing in to scope `{}`", sc);
|
||||||
// Create the authentication token
|
// Create the authentication token
|
||||||
let enc = encode(&HEADER, &val, &key);
|
let enc = encode(&HEADER, &val, &key);
|
||||||
// Set the authentication on the session
|
// Set the authentication on the session
|
||||||
|
@ -201,6 +203,8 @@ pub async fn db(
|
||||||
id: Some(user),
|
id: Some(user),
|
||||||
..Claims::default()
|
..Claims::default()
|
||||||
};
|
};
|
||||||
|
// Log the authenticated database info
|
||||||
|
trace!("Signing in to database `{}`", db);
|
||||||
// Create the authentication token
|
// Create the authentication token
|
||||||
let enc = encode(&HEADER, &val, &key);
|
let enc = encode(&HEADER, &val, &key);
|
||||||
// Set the authentication on the session
|
// Set the authentication on the session
|
||||||
|
@ -241,6 +245,8 @@ pub async fn ns(
|
||||||
id: Some(user),
|
id: Some(user),
|
||||||
..Claims::default()
|
..Claims::default()
|
||||||
};
|
};
|
||||||
|
// Log the authenticated namespace info
|
||||||
|
trace!("Signing in to namespace `{}`", ns);
|
||||||
// Create the authentication token
|
// Create the authentication token
|
||||||
let enc = encode(&HEADER, &val, &key);
|
let enc = encode(&HEADER, &val, &key);
|
||||||
// Set the authentication on the session
|
// Set the authentication on the session
|
||||||
|
@ -278,6 +284,8 @@ pub async fn kv(
|
||||||
id: Some(user),
|
id: Some(user),
|
||||||
..Claims::default()
|
..Claims::default()
|
||||||
};
|
};
|
||||||
|
// Log the authenticated root info
|
||||||
|
trace!("Signing in as root");
|
||||||
// Create the authentication token
|
// Create the authentication token
|
||||||
let enc = encode(&HEADER, &val, &key);
|
let enc = encode(&HEADER, &val, &key);
|
||||||
// Set the authentication on the session
|
// Set the authentication on the session
|
||||||
|
|
|
@ -86,6 +86,8 @@ pub async fn sc(
|
||||||
id: Some(rid.to_raw()),
|
id: Some(rid.to_raw()),
|
||||||
..Claims::default()
|
..Claims::default()
|
||||||
};
|
};
|
||||||
|
// Log the authenticated scope info
|
||||||
|
trace!("Signing up to scope `{}`", sc);
|
||||||
// Create the authentication token
|
// Create the authentication token
|
||||||
let enc = encode(&HEADER, &val, &key);
|
let enc = encode(&HEADER, &val, &key);
|
||||||
// Set the authentication on the session
|
// Set the authentication on the session
|
||||||
|
|
|
@ -74,6 +74,7 @@ async fn expired_nodes_get_live_queries_archived() {
|
||||||
cond: None,
|
cond: None,
|
||||||
fetch: None,
|
fetch: None,
|
||||||
archived: Some(crate::sql::uuid::Uuid::from(old_node)),
|
archived: Some(crate::sql::uuid::Uuid::from(old_node)),
|
||||||
|
session: Some(Value::None),
|
||||||
auth: Some(Auth::for_root(Role::Owner)),
|
auth: Some(Auth::for_root(Role::Owner)),
|
||||||
};
|
};
|
||||||
let ctx = context::Context::background();
|
let ctx = context::Context::background();
|
||||||
|
@ -150,6 +151,7 @@ async fn single_live_queries_are_garbage_collected() {
|
||||||
cond: None,
|
cond: None,
|
||||||
fetch: None,
|
fetch: None,
|
||||||
archived: None,
|
archived: None,
|
||||||
|
session: Some(Value::None),
|
||||||
auth: Some(Auth::for_root(Role::Owner)),
|
auth: Some(Auth::for_root(Role::Owner)),
|
||||||
};
|
};
|
||||||
live_st
|
live_st
|
||||||
|
@ -166,6 +168,7 @@ async fn single_live_queries_are_garbage_collected() {
|
||||||
cond: None,
|
cond: None,
|
||||||
fetch: None,
|
fetch: None,
|
||||||
archived: None,
|
archived: None,
|
||||||
|
session: Some(Value::None),
|
||||||
auth: Some(Auth::for_root(Role::Owner)),
|
auth: Some(Auth::for_root(Role::Owner)),
|
||||||
};
|
};
|
||||||
live_st
|
live_st
|
||||||
|
@ -230,6 +233,7 @@ async fn bootstrap_does_not_error_on_missing_live_queries() {
|
||||||
cond: None,
|
cond: None,
|
||||||
fetch: None,
|
fetch: None,
|
||||||
archived: None,
|
archived: None,
|
||||||
|
session: Some(Value::None),
|
||||||
auth: Some(Auth::for_root(Role::Owner)),
|
auth: Some(Auth::for_root(Role::Owner)),
|
||||||
};
|
};
|
||||||
live_st
|
live_st
|
||||||
|
|
|
@ -27,7 +27,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
|
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
|
||||||
#[revisioned(revision = 1)]
|
#[revisioned(revision = 2)]
|
||||||
pub struct LiveStatement {
|
pub struct LiveStatement {
|
||||||
pub id: Uuid,
|
pub id: Uuid,
|
||||||
pub node: Uuid,
|
pub node: Uuid,
|
||||||
|
@ -35,15 +35,24 @@ pub struct LiveStatement {
|
||||||
pub what: Value,
|
pub what: Value,
|
||||||
pub cond: Option<Cond>,
|
pub cond: Option<Cond>,
|
||||||
pub fetch: Option<Fetchs>,
|
pub fetch: Option<Fetchs>,
|
||||||
|
// When a live query is marked for archiving, this will
|
||||||
// Non-query properties that are necessary for storage or otherwise carrying information
|
// be set to the node ID that archived the query. This
|
||||||
|
// is an internal property, set by the database runtime.
|
||||||
// When a live query is archived, this should be the node ID that archived the query.
|
// This is optional, and os only set when archived.
|
||||||
pub archived: Option<Uuid>,
|
pub(crate) archived: Option<Uuid>,
|
||||||
// A live query is run with permissions, and we must validate that during the run.
|
// When a live query is created, we must also store the
|
||||||
// It is optional, because the live query may be constructed without it being set.
|
// authenticated session of the user who made the query,
|
||||||
// It is populated during compute.
|
// so we can chack it later when sending notifications.
|
||||||
pub auth: Option<Auth>,
|
// This is optional as it is only set by the database
|
||||||
|
// runtime when storing the live query to storage.
|
||||||
|
#[revision(start = 2)]
|
||||||
|
pub(crate) session: Option<Value>,
|
||||||
|
// When a live query is created, we must also store the
|
||||||
|
// authenticated session of the user who made the query,
|
||||||
|
// so we can chack it later when sending notifications.
|
||||||
|
// This is optional as it is only set by the database
|
||||||
|
// runtime when storing the live query to storage.
|
||||||
|
pub(crate) auth: Option<Auth>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LiveStatement {
|
impl LiveStatement {
|
||||||
|
@ -59,33 +68,33 @@ impl LiveStatement {
|
||||||
opt.realtime()?;
|
opt.realtime()?;
|
||||||
// Valid options?
|
// Valid options?
|
||||||
opt.valid_for_db()?;
|
opt.valid_for_db()?;
|
||||||
|
// Get the Node ID
|
||||||
|
let nid = opt.id()?;
|
||||||
// Check that auth has been set
|
// Check that auth has been set
|
||||||
let self_override = LiveStatement {
|
let mut stm = LiveStatement {
|
||||||
auth: match self.auth {
|
// Use the current session authentication
|
||||||
Some(ref auth) => Some(auth.clone()),
|
// for when we store the LIVE Statement
|
||||||
None => Some(opt.auth.as_ref().clone()),
|
session: ctx.value("session").cloned(),
|
||||||
},
|
// Use the current session authentication
|
||||||
|
// for when we store the LIVE Statement
|
||||||
|
auth: Some(opt.auth.as_ref().clone()),
|
||||||
|
// Clone the rest of the original fields
|
||||||
|
// from the LIVE statement to the new one
|
||||||
..self.clone()
|
..self.clone()
|
||||||
};
|
};
|
||||||
trace!("Evaluated live query auth to {:?}", self_override.auth);
|
let id = stm.id.0;
|
||||||
// Claim transaction
|
// Claim transaction
|
||||||
let mut run = txn.lock().await;
|
let mut run = txn.lock().await;
|
||||||
// Process the live query table
|
// Process the live query table
|
||||||
match self_override.what.compute(ctx, opt, txn, doc).await? {
|
match stm.what.compute(ctx, opt, txn, doc).await? {
|
||||||
Value::Table(tb) => {
|
Value::Table(tb) => {
|
||||||
// Clone the current statement
|
|
||||||
let mut stm = self_override.clone();
|
|
||||||
// Store the current Node ID
|
// Store the current Node ID
|
||||||
if let Err(e) = opt.id() {
|
stm.node = nid.into();
|
||||||
trace!("No ID for live query {:?}, error={:?}", stm, e)
|
|
||||||
}
|
|
||||||
stm.node = Uuid(opt.id()?);
|
|
||||||
// Insert the node live query
|
// Insert the node live query
|
||||||
let key =
|
let key = crate::key::node::lq::new(opt.id()?, id, opt.ns(), opt.db());
|
||||||
crate::key::node::lq::new(opt.id()?, self_override.id.0, opt.ns(), opt.db());
|
|
||||||
run.putc(key, tb.as_str(), None).await?;
|
run.putc(key, tb.as_str(), None).await?;
|
||||||
// Insert the table live query
|
// Insert the table live query
|
||||||
let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, self_override.id.0);
|
let key = crate::key::table::lq::new(opt.ns(), opt.db(), &tb, id);
|
||||||
run.putc(key, stm, None).await?;
|
run.putc(key, stm, None).await?;
|
||||||
}
|
}
|
||||||
v => {
|
v => {
|
||||||
|
@ -95,8 +104,7 @@ impl LiveStatement {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Return the query id
|
// Return the query id
|
||||||
trace!("Live query after processing: {:?}", self_override);
|
Ok(id.into())
|
||||||
Ok(self_override.id.clone().into())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
|
pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
|
||||||
|
|
|
@ -47,6 +47,7 @@ pub struct SerializeLiveStatement {
|
||||||
cond: Option<Cond>,
|
cond: Option<Cond>,
|
||||||
fetch: Option<Fetchs>,
|
fetch: Option<Fetchs>,
|
||||||
archived: Option<Uuid>,
|
archived: Option<Uuid>,
|
||||||
|
session: Option<Value>,
|
||||||
auth: Option<Auth>,
|
auth: Option<Auth>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,6 +81,9 @@ impl serde::ser::SerializeStruct for SerializeLiveStatement {
|
||||||
"archived" => {
|
"archived" => {
|
||||||
self.archived = value.serialize(ser::uuid::opt::Serializer.wrap())?.map(Uuid);
|
self.archived = value.serialize(ser::uuid::opt::Serializer.wrap())?.map(Uuid);
|
||||||
}
|
}
|
||||||
|
"session" => {
|
||||||
|
self.session = None;
|
||||||
|
}
|
||||||
"auth" => {
|
"auth" => {
|
||||||
self.auth = None;
|
self.auth = None;
|
||||||
}
|
}
|
||||||
|
@ -99,6 +103,7 @@ impl serde::ser::SerializeStruct for SerializeLiveStatement {
|
||||||
cond: self.cond,
|
cond: self.cond,
|
||||||
fetch: self.fetch,
|
fetch: self.fetch,
|
||||||
archived: self.archived,
|
archived: self.archived,
|
||||||
|
session: None,
|
||||||
auth: None,
|
auth: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue