improve graphql record fetching (#4682)

This commit is contained in:
Raphael Darley 2024-09-12 15:14:14 +01:00 committed by GitHub
parent 749a30014f
commit bfa8e98753
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 314 additions and 131 deletions

View file

@ -42,8 +42,8 @@ impl<'a> Executor<'a> {
}
}
fn txn(&self) -> Arc<Transaction> {
self.txn.clone().expect("unreachable: txn was None after successful begin")
fn txn(&self) -> Result<Arc<Transaction>, Error> {
self.txn.clone().ok_or_else(|| fail!("txn was None after successful begin"))
}
/// # Return
@ -312,7 +312,7 @@ impl<'a> Executor<'a> {
false => {
// ctx.set_transaction(txn)
let mut c = MutableContext::unfreeze(ctx)?;
c.set_transaction(self.txn());
c.set_transaction(self.txn()?);
ctx = c.freeze();
// Check the statement
match stack
@ -376,7 +376,7 @@ impl<'a> Executor<'a> {
// Create a new context for this statement
let mut ctx = MutableContext::new(&ctx);
// Set the transaction on the context
ctx.set_transaction(self.txn());
ctx.set_transaction(self.txn()?);
let c = ctx.freeze();
// Process the statement
let res = stack

View file

@ -38,6 +38,24 @@ impl Invalidator for Pessimistic {
}
}
#[derive(Debug, Clone, Copy)]
pub struct Optimistic;
impl Invalidator for Optimistic {
type MetaData = ();
fn is_valid(_datastore: &Datastore, _session: &Session, _meta: &Self::MetaData) -> bool {
true
}
async fn generate(
datastore: &Arc<Datastore>,
session: &Session,
) -> Result<(Schema, Self::MetaData), GqlError> {
let schema = generate_schema(datastore, session).await?;
Ok((schema, ()))
}
}
#[derive(Clone)]
pub struct SchemaCache<I: Invalidator = Pessimistic> {
#[allow(clippy::type_complexity)]

View file

@ -141,18 +141,18 @@ impl ValidatorExt for Scalar {
}
}
use crate::sql::Object as SqlObject;
use crate::sql::Thing as SqlThing;
use crate::sql::Value as SqlValue;
pub trait TryAsExt {
fn try_as_object(self) -> Result<SqlObject, Self>
fn try_as_thing(self) -> Result<SqlThing, Self>
where
Self: Sized;
}
impl TryAsExt for SqlValue {
fn try_as_object(self) -> Result<SqlObject, Self> {
fn try_as_thing(self) -> Result<SqlThing, Self> {
match self {
SqlValue::Object(o) => Ok(o),
SqlValue::Thing(t) => Ok(t),
v => Err(v),
}
}

View file

@ -6,10 +6,10 @@ use crate::dbs::Session;
use crate::kvs::Datastore;
use crate::sql::kind::Literal;
use crate::sql::statements::{DefineFieldStatement, SelectStatement};
use crate::sql::Kind;
use crate::sql::{self, Table};
use crate::sql::{Cond, Fields};
use crate::sql::{Expression, Geometry};
use crate::sql::{Idiom, Kind};
use crate::sql::{Statement, Thing};
use async_graphql::dynamic::{Enum, FieldValue, ResolverContext, Type, Union};
use async_graphql::dynamic::{Field, Interface};
@ -30,12 +30,17 @@ use super::ext::IntoExt;
use super::ext::ValidatorExt;
use crate::gql::error::{internal_error, schema_error, type_error};
use crate::gql::ext::TryAsExt;
use crate::gql::utils::{get_record, GqlValueUtils};
use crate::gql::utils::{GQLTx, GqlValueUtils};
use crate::kvs::LockType;
use crate::kvs::TransactionType;
use crate::sql::Object as SqlObject;
use crate::sql::Value as SqlValue;
type ErasedRecord = (GQLTx, Thing);
fn field_val_erase_owned(val: ErasedRecord) -> FieldValue<'static> {
FieldValue::owned_any(val)
}
macro_rules! limit_input {
() => {
InputValue::new("limit", TypeRef::named(TypeRef::INT))
@ -76,7 +81,7 @@ pub async fn generate_schema(
datastore: &Arc<Datastore>,
session: &Session,
) -> Result<Schema, GqlError> {
let kvs = datastore.as_ref();
let kvs = datastore;
let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
let ns = session.ns.as_ref().ok_or(GqlError::UnpecifiedNamespace)?;
let db = session.db.as_ref().ok_or(GqlError::UnpecifiedDatabase)?;
@ -128,7 +133,7 @@ pub async fn generate_schema(
let fds1 = fds1.clone();
let kvs1 = kvs1.clone();
FieldFuture::new(async move {
let kvs = kvs1.as_ref();
let gtx = GQLTx::new(&kvs1, &sess1).await?;
let args = ctx.args.as_index_map();
trace!("received request with args: {args:?}");
@ -193,10 +198,18 @@ pub async fn generate_schema(
trace!("parsed filter: {cond:?}");
// SELECT VALUE id FROM ...
let ast = Statement::Select({
SelectStatement {
what: vec![SqlValue::Table(tb_name.intox())].into(),
expr: Fields::all(),
expr: Fields(
vec![sql::Field::Single {
expr: SqlValue::Idiom(Idiom::from("id")),
alias: None,
}],
// this means the `value` keyword
true,
),
order: orders.map(IntoExt::intox),
cond,
limit,
@ -207,16 +220,7 @@ pub async fn generate_schema(
trace!("generated query ast: {ast:?}");
let query = ast.into();
trace!("generated query: {}", query);
let res = kvs.process(query, &sess1, Default::default()).await?;
debug_assert_eq!(res.len(), 1);
let res = res
.into_iter()
.next()
.expect("response vector should have exactly one value")
.result?;
let res = gtx.process_stmt(ast).await?;
let res_vec =
match res {
@ -230,13 +234,18 @@ pub async fn generate_schema(
let out: Result<Vec<FieldValue>, SqlValue> = res_vec
.0
.into_iter()
.map(|v| v.try_as_object().map(FieldValue::owned_any))
.map(|v| {
v.try_as_thing().map(|t| {
let erased: ErasedRecord = (gtx.clone(), t);
field_val_erase_owned(erased)
})
})
.collect();
match out {
Ok(l) => Ok(Some(FieldValue::list(l))),
Err(v) => {
Err(internal_error(format!("expected object, found: {v:?}")).into())
Err(internal_error(format!("expected thing, found: {v:?}")).into())
}
}
})
@ -260,7 +269,7 @@ pub async fn generate_schema(
FieldFuture::new({
let sess2 = sess2.clone();
async move {
let kvs = kvs2.as_ref();
let gtx = GQLTx::new(&kvs2, &sess2).await?;
let args = ctx.args.as_index_map();
let id = match args.get("id").and_then(GqlValueUtils::as_string) {
@ -277,8 +286,11 @@ pub async fn generate_schema(
Err(_) => Thing::from((tb_name, id)),
};
match get_record(kvs, &sess2, &thing).await? {
SqlValue::Object(o) => Ok(Some(FieldValue::owned_any(o))),
match gtx.get_record_field(thing, "id").await? {
SqlValue::Thing(t) => {
let erased: ErasedRecord = (gtx, t);
Ok(Some(field_val_erase_owned(erased)))
}
_ => Ok(None),
}
}
@ -293,8 +305,6 @@ pub async fn generate_schema(
"id",
TypeRef::named_nn(TypeRef::ID),
make_table_field_resolver(
datastore,
session,
"id",
Some(Kind::Record(vec![Table::from(tb.name.to_string())])),
),
@ -324,7 +334,7 @@ pub async fn generate_schema(
table_ty_obj = table_ty_obj.field(Field::new(
fd.name.to_string(),
fd_type,
make_table_field_resolver(datastore, session, fd_name.as_str(), fd.kind.clone()),
make_table_field_resolver(fd_name.as_str(), fd.kind.clone()),
));
}
@ -338,11 +348,11 @@ pub async fn generate_schema(
let kvs3 = datastore.to_owned();
query = query.field(
Field::new("_get", TypeRef::named("record"), move |ctx| {
let kvs3 = kvs3.clone();
FieldFuture::new({
let sess3 = sess3.clone();
let kvs3 = kvs3.clone();
async move {
let kvs = kvs3.as_ref();
let gtx = GQLTx::new(&kvs3, &sess3).await?;
let args = ctx.args.as_index_map();
let id = match args.get("id").and_then(GqlValueUtils::as_string) {
@ -360,10 +370,10 @@ pub async fn generate_schema(
Err(_) => return Err(resolver_error(format!("invalid id: {id}")).into()),
};
match get_record(kvs, &sess3, &thing).await? {
SqlValue::Object(o) => {
let out = FieldValue::owned_any(o).with_type(thing.tb.to_string());
match gtx.get_record_field(thing, "id").await? {
SqlValue::Thing(t) => {
let ty = t.tb.to_string();
let out = field_val_erase_owned((gtx, t)).with_type(ty);
Ok(Some(out))
}
_ => Ok(None),
@ -455,52 +465,37 @@ pub async fn generate_schema(
}
fn make_table_field_resolver(
kvs: &Arc<Datastore>,
sess: &Session,
fd_name: impl Into<String>,
kind: Option<Kind>,
) -> impl for<'a> Fn(ResolverContext<'a>) -> FieldFuture<'a> + Send + Sync + 'static {
let fd_name = fd_name.into();
let sess_field = Arc::new(sess.to_owned());
let kvs_field = kvs.clone();
move |ctx: ResolverContext| {
let sess_field = sess_field.clone();
let fd_name = fd_name.clone();
let kvs_field = kvs_field.clone();
let field_kind = kind.clone();
FieldFuture::new({
let kvs_field = kvs_field.clone();
async move {
let kvs = kvs_field.as_ref();
let record: &SqlObject = ctx
let (ref gtx, ref rid) = ctx
.parent_value
.downcast_ref::<SqlObject>()
.downcast_ref::<ErasedRecord>()
.ok_or_else(|| internal_error("failed to downcast"))?;
let Some(val) = record.get(fd_name.as_str()) else {
return Ok(None);
};
let val = gtx.get_record_field(rid.clone(), fd_name.as_str()).await?;
let out = match val {
SqlValue::Thing(rid)if fd_name != "id" => match get_record(kvs, &sess_field, rid).await?
{
SqlValue::Object(o) => {
let mut tmp = FieldValue::owned_any(o);
match field_kind {
Some(Kind::Record(ts)) if ts.len() != 1 => {tmp = tmp.with_type(rid.tb.clone())}
_ => {}
}
Ok(Some(tmp))
SqlValue::Thing(rid) if fd_name != "id" => {
let mut tmp = field_val_erase_owned((gtx.clone(), rid.clone()));
match field_kind {
Some(Kind::Record(ts)) if ts.len() != 1 => {
tmp = tmp.with_type(rid.tb.clone())
}
v => Err(resolver_error(format!("expected object, but found (referential integrity might be broken): {v:?}")).into()),
_ => {}
}
Ok(Some(tmp))
}
SqlValue::None | SqlValue::Null => Ok(None),
v => {
match field_kind {
Some(Kind::Either(ks)) if ks.len() != 1 => {
}
Some(Kind::Either(ks)) if ks.len() != 1 => {}
_ => {}
}
let out = sql_value_to_gql_value(v.to_owned())

View file

@ -1,4 +1,7 @@
use std::sync::Arc;
use async_graphql::{dynamic::indexmap::IndexMap, Name, Value as GqlValue};
use reblessive::TreeStack;
pub(crate) trait GqlValueUtils {
fn as_i64(&self) -> Option<i64>;
fn as_string(&self) -> Option<String>;
@ -38,23 +41,69 @@ impl GqlValueUtils for GqlValue {
}
}
use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::Session;
use crate::err::Error;
use crate::iam::Error as IamError;
use crate::kvs::Datastore;
use crate::kvs::LockType;
use crate::kvs::TransactionType;
use crate::sql::part::Part;
use crate::sql::Statement;
use crate::sql::{Thing, Value as SqlValue};
use super::error::GqlError;
pub async fn get_record(
kvs: &Datastore,
sess: &Session,
rid: &Thing,
) -> Result<SqlValue, GqlError> {
let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
Ok(tx
.get_record(sess.ns.as_ref().unwrap(), sess.db.as_ref().unwrap(), &rid.tb, &rid.id)
.await?
.as_ref()
.to_owned())
#[derive(Clone)]
pub struct GQLTx {
opt: Options,
ctx: Context,
}
impl GQLTx {
pub async fn new(kvs: &Arc<Datastore>, sess: &Session) -> Result<Self, GqlError> {
kvs.check_anon(sess).map_err(|_| {
Error::IamError(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "graphql".to_string(),
})
})?;
let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
let tx = Arc::new(tx);
let mut ctx = kvs.setup_ctx()?;
ctx.set_transaction(tx);
sess.context(&mut ctx);
Ok(GQLTx {
ctx: ctx.freeze(),
opt: kvs.setup_options(sess),
})
}
pub async fn get_record_field(
&self,
rid: Thing,
field: impl Into<Part>,
) -> Result<SqlValue, GqlError> {
let mut stack = TreeStack::new();
let part = [field.into()];
let value = SqlValue::Thing(rid);
stack
.enter(|stk| value.get(stk, &self.ctx, &self.opt, None, &part))
.finish()
.await
.map_err(Into::into)
}
pub async fn process_stmt(&self, stmt: Statement) -> Result<SqlValue, GqlError> {
let mut stack = TreeStack::new();
let res = stack.enter(|stk| stmt.compute(stk, &self.ctx, &self.opt, None)).finish().await?;
Ok(res)
}
}

View file

@ -796,39 +796,19 @@ impl Datastore {
}
// Check if anonymous actors can execute queries when auth is enabled
// TODO(sgirones): Check this as part of the authorisation layer
if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
return Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "query".to_string(),
}
.into());
}
self.check_anon(sess).map_err(|_| IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "process".to_string(),
resource: "query".to_string(),
})?;
// Create a new query options
let opt = Options::default()
.with_id(self.id)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let opt = self.setup_options(sess);
// Create a new query executor
let mut exe = Executor::new(self);
// Create a default context
let mut ctx = MutableContext::from_ds(
self.query_timeout,
self.capabilities.clone(),
self.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
self.index_builder.clone(),
#[cfg(storage)]
self.temporary_directory.clone(),
)?;
// Setup the notification channel
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
let mut ctx = self.setup_ctx()?;
// Start an execution context
sess.context(&mut ctx);
// Store the query variables
@ -868,25 +848,16 @@ impl Datastore {
}
// Check if anonymous actors can compute values when auth is enabled
// TODO(sgirones): Check this as part of the authorisation layer
if sess.au.is_anon() && self.auth_enabled && !self.capabilities.allows_guest_access() {
return Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "compute".to_string(),
resource: "value".to_string(),
}
.into());
}
self.check_anon(sess).map_err(|_| IamError::NotAllowed {
actor: "anonymous".to_string(),
action: "compute".to_string(),
resource: "value".to_string(),
})?;
// Create a new memory stack
let mut stack = TreeStack::new();
// Create a new query options
let opt = Options::default()
.with_id(self.id)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let opt = self.setup_options(sess);
// Create a default context
let mut ctx = MutableContext::default();
// Set context capabilities
@ -958,14 +929,7 @@ impl Datastore {
// Create a new memory stack
let mut stack = TreeStack::new();
// Create a new query options
let opt = Options::default()
.with_id(self.id)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled);
let opt = self.setup_options(sess);
// Create a default context
let mut ctx = MutableContext::default();
// Set context capabilities
@ -1075,6 +1039,46 @@ impl Datastore {
// All ok
Ok(())
}
pub fn setup_options(&self, sess: &Session) -> Options {
Options::default()
.with_id(self.id)
.with_ns(sess.ns())
.with_db(sess.db())
.with_live(sess.live())
.with_auth(sess.au.clone())
.with_strict(self.strict)
.with_auth_enabled(self.auth_enabled)
}
pub fn setup_ctx(&self) -> Result<MutableContext, Error> {
let mut ctx = MutableContext::from_ds(
self.query_timeout,
self.capabilities.clone(),
self.index_stores.clone(),
#[cfg(not(target_arch = "wasm32"))]
self.index_builder.clone(),
#[cfg(storage)]
self.temporary_directory.clone(),
)?;
// Setup the notification channel
if let Some(channel) = &self.notification_channel {
ctx.add_notifications(Some(&channel.0));
}
Ok(ctx)
}
/// check for disallowed anonymous users
pub fn check_anon(&self, sess: &Session) -> Result<(), IamError> {
if self.auth_enabled && sess.au.is_anon() && !self.capabilities.allows_guest_access() {
Err(IamError::NotAllowed {
actor: "anonymous".to_string(),
action: String::new(),
resource: String::new(),
})
} else {
Ok(())
}
}
}
#[cfg(test)]

View file

@ -218,6 +218,17 @@ pub async fn start_server_with_temporary_directory(
.await
}
pub async fn start_server_gql() -> Result<(String, Child), Box<dyn Error>> {
start_server(StartServerArguments {
vars: Some(HashMap::from([(
"SURREAL_EXPERIMENTAL_GRAPHQL".to_string(),
"true".to_string(),
)])),
..Default::default()
})
.await
}
pub async fn start_server_gql_without_auth() -> Result<(String, Child), Box<dyn Error>> {
start_server(StartServerArguments {
auth: false,

View file

@ -5,10 +5,14 @@ mod graphql_integration {
use std::time::Duration;
use http::header;
use reqwest::Client;
use serde_json::json;
use test_log::test;
use tracing::debug;
use ulid::Ulid;
use crate::common::{PASS, USER};
use super::common;
#[test(tokio::test)]
@ -23,7 +27,7 @@ mod graphql_integration {
headers.insert("surreal-ns", ns.parse()?);
headers.insert("surreal-db", db.parse()?);
headers.insert(header::ACCEPT, "application/json".parse()?);
let client = reqwest::Client::builder()
let client = Client::builder()
.connect_timeout(Duration::from_millis(10))
.default_headers(headers)
.build()?;
@ -170,4 +174,106 @@ mod graphql_integration {
Ok(())
}
#[test(tokio::test)]
async fn basic_auth() -> Result<(), Box<dyn std::error::Error>> {
let (addr, _server) = common::start_server_gql().await.unwrap();
let gql_url = &format!("http://{addr}/graphql");
let sql_url = &format!("http://{addr}/sql");
let signup_url = &format!("http://{addr}/signup");
let mut headers = reqwest::header::HeaderMap::new();
let ns = Ulid::new().to_string();
let db = Ulid::new().to_string();
headers.insert("surreal-ns", ns.parse()?);
headers.insert("surreal-db", db.parse()?);
headers.insert(header::ACCEPT, "application/json".parse()?);
let client = Client::builder()
.connect_timeout(Duration::from_millis(10))
.default_headers(headers)
.build()?;
// check errors on invalid auth
{
let res =
client.post(gql_url).basic_auth("invalid", Some("invalid")).body("").send().await?;
assert_eq!(res.status(), 401);
let body = res.text().await?;
assert!(body.contains("There was a problem with authentication"), "body: {body}")
}
// add schema and data
{
let res = client
.post(sql_url)
.basic_auth(USER, Some(PASS))
.body(
r#"
DEFINE ACCESS user ON DATABASE TYPE RECORD
SIGNUP ( CREATE user SET email = $email, pass = crypto::argon2::generate($pass) )
SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(pass, $pass) )
DURATION FOR SESSION 60s, FOR TOKEN 1d;
DEFINE TABLE foo SCHEMAFUL PERMISSIONS FOR select WHERE $auth.email = email;
DEFINE FIELD email ON foo TYPE string;
DEFINE FIELD val ON foo TYPE int;
CREATE foo:1 set val = 42, email = "user@email.com";
CREATE foo:2 set val = 43, email = "other@email.com";
"#,
)
.send()
.await?;
assert_eq!(res.status(), 200);
let body = res.text().await?;
debug!(?body);
}
// check works with root
{
let res = client
.post(gql_url)
.basic_auth(USER, Some(PASS))
.body(json!({"query": r#"query{foo{id, val}}"#}).to_string())
.send()
.await?;
assert_eq!(res.status(), 200);
let body = res.text().await?;
let expected =
json!({"data":{"foo":[{"id":"foo:1","val":42},{"id":"foo:2","val":43}]}});
assert_eq!(expected.to_string(), body);
}
// check partial access
{
let req_body = serde_json::to_string(
json!({
"ns": ns,
"db": db,
"ac": "user",
"email": "user@email.com",
"pass": "pass",
})
.as_object()
.unwrap(),
)
.unwrap();
let res = client.post(signup_url).body(req_body).send().await?;
assert_eq!(res.status(), 200, "body: {}", res.text().await?);
let body: serde_json::Value = serde_json::from_str(&res.text().await?).unwrap();
let token = body["token"].as_str().unwrap();
let res = client
.post(gql_url)
.bearer_auth(token)
.body(json!({"query": r#"query{foo{id, val}}"#}).to_string())
.send()
.await?;
assert_eq!(res.status(), 200);
let body = res.text().await?;
let expected = json!({"data":{"foo":[{"id":"foo:1","val":42}]}});
assert_eq!(expected.to_string(), body);
}
Ok(())
}
}