Add insert
method to WebSocket RPC endpoint (#2296)
This commit is contained in:
parent
88bcc87d39
commit
773cb5f130
3 changed files with 88 additions and 43 deletions
|
@ -10,20 +10,21 @@ use crate::sql::comment::shouldbespace;
|
|||
use crate::sql::data::{single, update, values, Data};
|
||||
use crate::sql::error::IResult;
|
||||
use crate::sql::output::{output, Output};
|
||||
use crate::sql::table::{table, Table};
|
||||
use crate::sql::param::param;
|
||||
use crate::sql::table::table;
|
||||
use crate::sql::timeout::{timeout, Timeout};
|
||||
use crate::sql::value::Value;
|
||||
use derive::Store;
|
||||
use nom::branch::alt;
|
||||
use nom::bytes::complete::tag_no_case;
|
||||
use nom::combinator::opt;
|
||||
use nom::combinator::{map, opt};
|
||||
use nom::sequence::preceded;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
|
||||
pub struct InsertStatement {
|
||||
pub into: Table,
|
||||
pub into: Value,
|
||||
pub data: Data,
|
||||
pub ignore: bool,
|
||||
pub update: Option<Data>,
|
||||
|
@ -62,49 +63,56 @@ impl InsertStatement {
|
|||
// Ensure futures are stored
|
||||
let opt = &opt.new_with_futures(false);
|
||||
// Parse the expression
|
||||
match &self.data {
|
||||
// Check if this is a traditional statement
|
||||
Data::ValuesExpression(v) => {
|
||||
for v in v {
|
||||
// Create a new empty base object
|
||||
let mut o = Value::base();
|
||||
// Set each field from the expression
|
||||
for (k, v) in v.iter() {
|
||||
let v = v.compute(ctx, opt, txn, None).await?;
|
||||
o.set(ctx, opt, txn, k, v).await?;
|
||||
match self.into.compute(ctx, opt, txn, doc).await? {
|
||||
Value::Table(into) => match &self.data {
|
||||
// Check if this is a traditional statement
|
||||
Data::ValuesExpression(v) => {
|
||||
for v in v {
|
||||
// Create a new empty base object
|
||||
let mut o = Value::base();
|
||||
// Set each field from the expression
|
||||
for (k, v) in v.iter() {
|
||||
let v = v.compute(ctx, opt, txn, None).await?;
|
||||
o.set(ctx, opt, txn, k, v).await?;
|
||||
}
|
||||
// Specify the new table record id
|
||||
let id = o.rid().generate(&into, true)?;
|
||||
// Pass the mergeable to the iterator
|
||||
i.ingest(Iterable::Mergeable(id, o));
|
||||
}
|
||||
// Specify the new table record id
|
||||
let id = o.rid().generate(&self.into, true)?;
|
||||
// Pass the mergeable to the iterator
|
||||
i.ingest(Iterable::Mergeable(id, o));
|
||||
}
|
||||
}
|
||||
// Check if this is a modern statement
|
||||
Data::SingleExpression(v) => {
|
||||
let v = v.compute(ctx, opt, txn, doc).await?;
|
||||
match v {
|
||||
Value::Array(v) => {
|
||||
for v in v {
|
||||
// Check if this is a modern statement
|
||||
Data::SingleExpression(v) => {
|
||||
let v = v.compute(ctx, opt, txn, doc).await?;
|
||||
match v {
|
||||
Value::Array(v) => {
|
||||
for v in v {
|
||||
// Specify the new table record id
|
||||
let id = v.rid().generate(&into, true)?;
|
||||
// Pass the mergeable to the iterator
|
||||
i.ingest(Iterable::Mergeable(id, v));
|
||||
}
|
||||
}
|
||||
Value::Object(_) => {
|
||||
// Specify the new table record id
|
||||
let id = v.rid().generate(&self.into, true)?;
|
||||
let id = v.rid().generate(&into, true)?;
|
||||
// Pass the mergeable to the iterator
|
||||
i.ingest(Iterable::Mergeable(id, v));
|
||||
}
|
||||
}
|
||||
Value::Object(_) => {
|
||||
// Specify the new table record id
|
||||
let id = v.rid().generate(&self.into, true)?;
|
||||
// Pass the mergeable to the iterator
|
||||
i.ingest(Iterable::Mergeable(id, v));
|
||||
}
|
||||
v => {
|
||||
return Err(Error::InsertStatement {
|
||||
value: v.to_string(),
|
||||
})
|
||||
v => {
|
||||
return Err(Error::InsertStatement {
|
||||
value: v.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
v => {
|
||||
return Err(Error::RelateStatement {
|
||||
value: v.to_string(),
|
||||
})
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
// Assign the statement
|
||||
let stm = Statement::from(self);
|
||||
|
@ -136,9 +144,12 @@ impl fmt::Display for InsertStatement {
|
|||
pub fn insert(i: &str) -> IResult<&str, InsertStatement> {
|
||||
let (i, _) = tag_no_case("INSERT")(i)?;
|
||||
let (i, ignore) = opt(preceded(shouldbespace, tag_no_case("IGNORE")))(i)?;
|
||||
let (i, _) = preceded(shouldbespace, tag_no_case("INTO"))(i)?;
|
||||
let (i, into) = preceded(shouldbespace, table)(i)?;
|
||||
let (i, data) = preceded(shouldbespace, alt((values, single)))(i)?;
|
||||
let (i, _) = shouldbespace(i)?;
|
||||
let (i, _) = tag_no_case("INTO")(i)?;
|
||||
let (i, _) = shouldbespace(i)?;
|
||||
let (i, into) = alt((map(table, Value::Table), map(param, Value::Param)))(i)?;
|
||||
let (i, _) = shouldbespace(i)?;
|
||||
let (i, data) = alt((values, single))(i)?;
|
||||
let (i, update) = opt(preceded(shouldbespace, update))(i)?;
|
||||
let (i, output) = opt(preceded(shouldbespace, output))(i)?;
|
||||
let (i, timeout) = opt(preceded(shouldbespace, timeout))(i)?;
|
||||
|
|
|
@ -3,8 +3,8 @@ use crate::sql::statements::InsertStatement;
|
|||
use crate::sql::value::serde::ser;
|
||||
use crate::sql::Data;
|
||||
use crate::sql::Output;
|
||||
use crate::sql::Table;
|
||||
use crate::sql::Timeout;
|
||||
use crate::sql::Value;
|
||||
use ser::Serializer as _;
|
||||
use serde::ser::Error as _;
|
||||
use serde::ser::Impossible;
|
||||
|
@ -38,7 +38,7 @@ impl ser::Serializer for Serializer {
|
|||
|
||||
#[derive(Default)]
|
||||
pub struct SerializeInsertStatement {
|
||||
into: Option<Table>,
|
||||
into: Option<Value>,
|
||||
data: Option<Data>,
|
||||
ignore: Option<bool>,
|
||||
update: Option<Data>,
|
||||
|
@ -57,7 +57,7 @@ impl serde::ser::SerializeStruct for SerializeInsertStatement {
|
|||
{
|
||||
match key {
|
||||
"into" => {
|
||||
self.into = Some(Table(value.serialize(ser::string::Serializer.wrap())?));
|
||||
self.into = Some(value.serialize(ser::value::Serializer.wrap())?);
|
||||
}
|
||||
"data" => {
|
||||
self.data = Some(value.serialize(ser::data::Serializer.wrap())?);
|
||||
|
|
|
@ -329,6 +329,11 @@ impl Rpc {
|
|||
Ok(v) => rpc.read().await.select(v).await,
|
||||
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
||||
},
|
||||
// Insert a value or values in the database
|
||||
"insert" => match params.needs_one_or_two() {
|
||||
Ok((v, o)) => rpc.read().await.insert(v, o).await,
|
||||
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
||||
},
|
||||
// Create a value or values in the database
|
||||
"create" => match params.needs_one_or_two() {
|
||||
Ok((v, o)) => rpc.read().await.create(v, o).await,
|
||||
|
@ -565,6 +570,35 @@ impl Rpc {
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
// Methods for inserting
|
||||
// ------------------------------
|
||||
|
||||
#[instrument(skip_all, name = "rpc insert", fields(websocket=self.uuid.to_string()))]
|
||||
async fn insert(&self, what: Value, data: Value) -> Result<Value, Error> {
|
||||
// Return a single result?
|
||||
let one = what.is_thing();
|
||||
// Get a database reference
|
||||
let kvs = DB.get().unwrap();
|
||||
// Specify the SQL query string
|
||||
let sql = "INSERT INTO $what $data RETURN AFTER";
|
||||
// Specify the query parameters
|
||||
let var = Some(map! {
|
||||
String::from("what") => what.could_be_table(),
|
||||
String::from("data") => data,
|
||||
=> &self.vars
|
||||
});
|
||||
// Execute the query on the database
|
||||
let mut res = kvs.execute(sql, &self.session, var).await?;
|
||||
// Extract the first query result
|
||||
let res = match one {
|
||||
true => res.remove(0).result?.first(),
|
||||
false => res.remove(0).result?,
|
||||
};
|
||||
// Return the result to the client
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
// Methods for creating
|
||||
// ------------------------------
|
||||
|
|
Loading…
Reference in a new issue