Add initial WebSocket JSON RPC implementation

This commit is contained in:
Tobie Morgan Hitchcock 2022-07-04 02:03:26 +01:00
parent 8dc7341cb1
commit 508538e0cc
9 changed files with 729 additions and 33 deletions

View file

@ -1,5 +1,6 @@
use crate::err::Error; use crate::err::Error;
use crate::sql::value::Value; use crate::sql::value::Value;
use crate::sql::Object;
use serde::ser::SerializeStruct; use serde::ser::SerializeStruct;
use serde::Serialize; use serde::Serialize;
use std::time::Duration; use std::time::Duration;
@ -26,29 +27,51 @@ impl Response {
} }
} }
impl From<Response> for Value {
fn from(v: Response) -> Value {
// Get the response speed
let time = v.speed();
// Get the response status
let status = v.output().map_or_else(|_| "ERR", |_| "OK");
// Convert the response
match v.result {
Ok(val) => match v.sql {
Some(sql) => Value::Object(Object(map! {
String::from("sql") => sql.into(),
String::from("time") => time.into(),
String::from("status") => status.into(),
String::from("result") => val,
})),
None => Value::Object(Object(map! {
String::from("time") => time.into(),
String::from("status") => status.into(),
String::from("result") => val,
})),
},
Err(err) => match v.sql {
Some(sql) => Value::Object(Object(map! {
String::from("sql") => sql.into(),
String::from("time") => time.into(),
String::from("status") => status.into(),
String::from("detail") => err.to_string().into(),
})),
None => Value::Object(Object(map! {
String::from("time") => time.into(),
String::from("status") => status.into(),
String::from("detail") => err.to_string().into(),
})),
},
}
}
}
impl Serialize for Response { impl Serialize for Response {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer, S: serde::Serializer,
{ {
match &self.result { match &self.result {
Ok(v) => match v { Ok(v) => match &self.sql {
Value::None => match &self.sql {
Some(s) => {
let mut val = serializer.serialize_struct("Response", 3)?;
val.serialize_field("sql", s.as_str())?;
val.serialize_field("time", self.speed().as_str())?;
val.serialize_field("status", "OK")?;
val.end()
}
None => {
let mut val = serializer.serialize_struct("Response", 2)?;
val.serialize_field("time", self.speed().as_str())?;
val.serialize_field("status", "OK")?;
val.end()
}
},
v => match &self.sql {
Some(s) => { Some(s) => {
let mut val = serializer.serialize_struct("Response", 4)?; let mut val = serializer.serialize_struct("Response", 4)?;
val.serialize_field("sql", s.as_str())?; val.serialize_field("sql", s.as_str())?;
@ -65,7 +88,6 @@ impl Serialize for Response {
val.end() val.end()
} }
}, },
},
Err(e) => match &self.sql { Err(e) => match &self.sql {
Some(s) => { Some(s) => {
let mut val = serializer.serialize_struct("Response", 4)?; let mut val = serializer.serialize_struct("Response", 4)?;

View file

@ -47,6 +47,9 @@ impl Strand {
pub fn as_string(self) -> String { pub fn as_string(self) -> String {
self.0 self.0
} }
pub fn to_raw(self) -> String {
self.0
}
} }
impl fmt::Display for Strand { impl fmt::Display for Strand {

View file

@ -2,6 +2,7 @@
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::Options; use crate::dbs::Options;
use crate::dbs::Response;
use crate::dbs::Transaction; use crate::dbs::Transaction;
use crate::err::Error; use crate::err::Error;
use crate::sql::array::{array, Array}; use crate::sql::array::{array, Array};
@ -46,6 +47,7 @@ use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::iter::FromIterator;
use std::ops; use std::ops;
use std::ops::Deref; use std::ops::Deref;
use std::str::FromStr; use std::str::FromStr;
@ -433,6 +435,16 @@ impl From<Id> for Value {
} }
} }
impl FromIterator<Response> for Vec<Value> {
fn from_iter<I: IntoIterator<Item = Response>>(iter: I) -> Self {
let mut c: Vec<Value> = vec![];
for i in iter {
c.push(i.into())
}
c
}
}
impl Value { impl Value {
// ----------------------------------- // -----------------------------------
// Initial record value // Initial record value

View file

@ -21,6 +21,7 @@ mod dbs;
mod err; mod err;
mod iam; mod iam;
mod net; mod net;
mod rpc;
fn main() { fn main() {
cli::init(); // Initiate the command line cli::init(); // Initiate the command line

531
src/net/rpc.rs Normal file
View file

@ -0,0 +1,531 @@
use crate::dbs::DB;
use crate::err::Error;
use crate::net::session;
use crate::rpc::args::Take;
use crate::rpc::paths::{ID, METHOD, PARAMS};
use crate::rpc::res::Failure;
use crate::rpc::res::Response;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use std::collections::BTreeMap;
use surrealdb::sql::Object;
use surrealdb::sql::Strand;
use surrealdb::sql::Value;
use surrealdb::Session;
use warp::ws::{Message, WebSocket, Ws};
use warp::Filter;
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path("rpc")
.and(warp::path::end())
.and(warp::ws())
.and(session::build())
.map(|ws: Ws, session: Session| ws.on_upgrade(move |ws| socket(ws, session)))
}
async fn socket(ws: WebSocket, session: Session) {
Rpc::new(ws, session).serve().await
}
pub struct Rpc {
session: Session,
vars: BTreeMap<String, Value>,
tx: SplitSink<WebSocket, Message>,
rx: SplitStream<WebSocket>,
}
impl Rpc {
// Instantiate a new RPC
pub fn new(ws: WebSocket, mut session: Session) -> Rpc {
// Create a new RPC variables store
let vars = BTreeMap::new();
// Split the WebSocket connection
let (tx, rx) = ws.split();
// Enable real-time live queries
session.rt = true;
// Create and store the Rpc connection
Rpc {
session,
vars,
tx,
rx,
}
}
// Serve the RPC endpoint
pub async fn serve(&mut self) {
while let Some(msg) = self.rx.next().await {
if let Ok(msg) = msg {
match true {
_ if msg.is_text() => {
let res = self.call(msg).await;
let res = serde_json::to_string(&res).unwrap();
let res = Message::text(res);
let _ = self.tx.send(res).await;
}
_ => (),
}
}
}
}
// Call RPC methods from the WebSocket
async fn call(&mut self, msg: Message) -> Response {
// Convert the message
let str = match msg.to_str() {
Ok(v) => v,
_ => return Response::failure(None, Failure::INTERNAL_ERROR),
};
// Parse the request
let req = match surrealdb::sql::json(str) {
Ok(v) if v.is_some() => v,
_ => return Response::failure(None, Failure::PARSE_ERROR),
};
// Fetch the 'id' argument
let id = match req.pick(&*ID) {
Value::Uuid(v) => Some(v.to_raw()),
Value::Strand(v) => Some(v.to_raw()),
_ => return Response::failure(None, Failure::INVALID_REQUEST),
};
// Fetch the 'method' argument
let method = match req.pick(&*METHOD) {
Value::Strand(v) => v.to_raw(),
_ => return Response::failure(id, Failure::INVALID_REQUEST),
};
// Fetch the 'params' argument
let params = match req.pick(&*PARAMS) {
Value::Array(v) => v,
_ => return Response::failure(id, Failure::INVALID_REQUEST),
};
// Match the method to a function
let res = match &method[..] {
"ping" => Ok(Value::True),
"info" => match params.len() {
0 => self.info().await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"use" => match params.take_two() {
(Value::Strand(ns), Value::Strand(db)) => self.yuse(ns, db).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"signup" => match params.take_one() {
Value::Object(v) => self.signup(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"signin" => match params.take_one() {
Value::Object(v) => self.signin(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"invalidate" => match params.len() {
0 => self.invalidate().await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"authenticate" => match params.take_one() {
Value::None => self.invalidate().await,
Value::Strand(v) => self.authenticate(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"kill" => match params.take_one() {
v if v.is_uuid() => self.kill(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"live" => match params.take_one() {
v if v.is_strand() => self.live(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"set" => match params.take_two() {
(Value::Strand(s), v) => self.set(s, v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"query" => match params.take_two() {
(Value::Strand(s), Value::None) => self.query(s).await,
(Value::Strand(s), Value::Object(o)) => self.query_with_vars(s, o).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"select" => match params.take_one() {
v if v.is_thing() => self.select(v).await,
v if v.is_strand() => self.select(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"create" => match params.take_two() {
(v, o) if v.is_thing() && o.is_none() => self.create(v).await,
(v, o) if v.is_strand() && o.is_none() => self.create(v).await,
(v, o) if v.is_thing() && o.is_object() => self.create_with(v, o).await,
(v, o) if v.is_strand() && o.is_object() => self.create_with(v, o).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"update" => match params.take_two() {
(v, o) if v.is_thing() && o.is_none() => self.update(v).await,
(v, o) if v.is_strand() && o.is_none() => self.update(v).await,
(v, o) if v.is_thing() && o.is_object() => self.update_with(v, o).await,
(v, o) if v.is_strand() && o.is_object() => self.update_with(v, o).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"change" => match params.take_two() {
(v, o) if v.is_thing() && o.is_none() => self.change(v).await,
(v, o) if v.is_strand() && o.is_none() => self.change(v).await,
(v, o) if v.is_thing() && o.is_object() => self.change_with(v, o).await,
(v, o) if v.is_strand() && o.is_object() => self.change_with(v, o).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"modify" => match params.take_two() {
(v, o) if v.is_thing() && o.is_none() => self.modify(v).await,
(v, o) if v.is_strand() && o.is_none() => self.modify(v).await,
(v, o) if v.is_thing() && o.is_object() => self.modify_with(v, o).await,
(v, o) if v.is_strand() && o.is_object() => self.modify_with(v, o).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
"delete" => match params.take_one() {
v if v.is_thing() => self.delete(v).await,
v if v.is_strand() => self.delete(v).await,
_ => return Response::failure(id, Failure::INVALID_PARAMS),
},
_ => return Response::failure(id, Failure::METHOD_NOT_FOUND),
};
// Return the final response
match res {
Ok(v) => Response::success(id, v),
Err(e) => Response::failure(id, Failure::custom(e.to_string())),
}
}
// ------------------------------
// Methods for authentication
// ------------------------------
async fn yuse(&mut self, ns: Strand, db: Strand) -> Result<Value, Error> {
self.session.ns = Some(ns.0);
self.session.db = Some(db.0);
Ok(Value::None)
}
async fn signup(&self, vars: Object) -> Result<Value, Error> {
crate::iam::signup::signup(vars).await.map(Into::into).map_err(Into::into)
}
async fn signin(&self, vars: Object) -> Result<Value, Error> {
crate::iam::signin::signin(vars).await.map(Into::into).map_err(Into::into)
}
async fn invalidate(&mut self) -> Result<Value, Error> {
crate::iam::clear::clear(&mut self.session).await?;
Ok(Value::None)
}
async fn authenticate(&mut self, token: Strand) -> Result<Value, Error> {
crate::iam::verify::token(&mut self.session, token.0).await?;
Ok(Value::None)
}
// ------------------------------
// Methods for identification
// ------------------------------
async fn info(&self) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "SELECT * FROM $auth";
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, None).await?;
// Extract the first value from the result
let res = res.remove(0).result?.first();
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for live queries
// ------------------------------
async fn kill(&self, id: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "KILL $id";
// Specify the query paramaters
let var = Some(map! {
String::from("id") => id,
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
async fn live(&self, tb: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "LIVE SELECT * FROM $tb";
// Specify the query paramaters
let var = Some(map! {
String::from("tb") => tb.make_table(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for querying
// ------------------------------
async fn set(&mut self, key: Strand, val: Value) -> Result<Value, Error> {
match val {
// Remove the variable if the value is NULL
v if v.is_null() => {
self.vars.remove(&key.0);
Ok(Value::Null)
}
// Store the value if the value is not NULL
v => {
self.vars.insert(key.0, v);
Ok(Value::Null)
}
}
}
async fn query(&self, sql: Strand) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the query paramaters
let var = Some(self.vars.clone());
// Execute the query on the database
let res = kvs.execute(&sql, &self.session, var).await?;
// Extract the first query result
let res = res.into_iter().collect::<Vec<Value>>().into();
// Return the result to the client
Ok(res)
}
async fn query_with_vars(&self, sql: Strand, mut vars: Object) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the query paramaters
let var = Some(mrg! { vars.0, &self.vars });
// Execute the query on the database
let res = kvs.execute(&sql, &self.session, var).await?;
// Extract the first query result
let res = res.into_iter().collect::<Vec<Value>>().into();
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for selecting
// ------------------------------
async fn select(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "SELECT * FROM $what";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for creating
// ------------------------------
async fn create(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "CREATE $what RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
async fn create_with(&self, what: Value, data: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "CREATE $what CONTENT $data RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
String::from("data") => data,
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for updating
// ------------------------------
async fn update(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
async fn update_with(&self, what: Value, data: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what CONTENT $data RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
String::from("data") => data,
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for changing
// ------------------------------
async fn change(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
async fn change_with(&self, what: Value, data: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what MERGE $data RETURN AFTER";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
String::from("data") => data,
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for modifying
// ------------------------------
async fn modify(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what RETURN DIFF";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
async fn modify_with(&self, what: Value, data: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "UPDATE $what DIFF $data RETURN DIFF";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
String::from("data") => data,
=> &self.vars
});
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
// ------------------------------
// Methods for deleting
// ------------------------------
async fn delete(&self, what: Value) -> Result<Value, Error> {
// Get a database reference
let kvs = DB.get().unwrap();
// Specify the SQL query string
let sql = "DELETE $what";
// Specify the query paramaters
let var = Some(map! {
String::from("what") => what.make_table_or_thing(),
=> &self.vars
});
// Merge in any session variables
// var.extend(self.vars.into_iter().map(|(k, v)| (k.clone(), v.clone())));
// Execute the query on the database
let mut res = kvs.execute(sql, &self.session, var).await?;
// Extract the first query result
let res = res.remove(0).result?;
// Return the result to the client
Ok(res)
}
}

38
src/rpc/args.rs Normal file
View file

@ -0,0 +1,38 @@
use surrealdb::sql::Array;
use surrealdb::sql::Value;
pub trait Take {
fn take_one(self) -> Value;
fn take_two(self) -> (Value, Value);
fn take_three(self) -> (Value, Value, Value);
}
impl Take for Array {
// Convert the array to one argument
fn take_one(self) -> Value {
let mut x = self.into_iter();
match x.next() {
Some(a) => a,
None => Value::None,
}
}
// Convert the array to two arguments
fn take_two(self) -> (Value, Value) {
let mut x = self.into_iter();
match (x.next(), x.next()) {
(Some(a), Some(b)) => (a, b),
(Some(a), None) => (a, Value::None),
(_, _) => (Value::None, Value::None),
}
}
// Convert the array to three arguments
fn take_three(self) -> (Value, Value, Value) {
let mut x = self.into_iter();
match (x.next(), x.next(), x.next()) {
(Some(a), Some(b), Some(c)) => (a, b, c),
(Some(a), Some(b), None) => (a, b, Value::None),
(Some(a), None, None) => (a, Value::None, Value::None),
(_, _, _) => (Value::None, Value::None, Value::None),
}
}
}

3
src/rpc/mod.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod args;
pub mod paths;
pub mod res;

8
src/rpc/paths.rs Normal file
View file

@ -0,0 +1,8 @@
use once_cell::sync::Lazy;
use surrealdb::sql::Part;
pub static ID: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("id")]);
pub static METHOD: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("method")]);
pub static PARAMS: Lazy<[Part; 1]> = Lazy::new(|| [Part::from("params")]);

78
src/rpc/res.rs Normal file
View file

@ -0,0 +1,78 @@
use serde::Serialize;
use std::borrow::Cow;
use surrealdb::sql::Value;
#[derive(Serialize)]
enum Content {
#[serde(rename = "result")]
Success(Value),
#[serde(rename = "error")]
Failure(Failure),
}
#[derive(Serialize)]
pub struct Response {
id: Option<String>,
#[serde(flatten)]
content: Content,
}
impl Response {
// Create a JSON RPC result response
pub fn success(id: Option<String>, val: Value) -> Response {
Response {
id,
content: Content::Success(val),
}
}
// Create a JSON RPC failure response
pub fn failure(id: Option<String>, err: Failure) -> Response {
Response {
id,
content: Content::Failure(err),
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct Failure {
code: i64,
message: Cow<'static, str>,
}
impl Failure {
pub const PARSE_ERROR: Failure = Failure {
code: -32700,
message: Cow::Borrowed("Parse error"),
};
pub const INVALID_REQUEST: Failure = Failure {
code: -32600,
message: Cow::Borrowed("Invalid Request"),
};
pub const METHOD_NOT_FOUND: Failure = Failure {
code: -32601,
message: Cow::Borrowed("Method not found"),
};
pub const INVALID_PARAMS: Failure = Failure {
code: -32602,
message: Cow::Borrowed("Invalid params"),
};
pub const INTERNAL_ERROR: Failure = Failure {
code: -32603,
message: Cow::Borrowed("Internal error"),
};
pub fn custom<S>(message: S) -> Failure
where
Cow<'static, str>: From<S>,
{
Failure {
code: -32000,
message: message.into(),
}
}
}