2022-07-27 13:05:28 +00:00
|
|
|
use crate::cli::CF;
|
2022-07-07 10:25:22 +00:00
|
|
|
use crate::cnf::MAX_CONCURRENT_CALLS;
|
2022-10-12 18:58:43 +00:00
|
|
|
use crate::cnf::PKG_NAME;
|
2022-10-27 08:58:08 +00:00
|
|
|
use crate::cnf::PKG_VERSION;
|
2022-10-19 13:56:43 +00:00
|
|
|
use crate::cnf::WEBSOCKET_PING_FREQUENCY;
|
2022-07-04 01:03:26 +00:00
|
|
|
use crate::dbs::DB;
|
|
|
|
use crate::err::Error;
|
|
|
|
use crate::net::session;
|
|
|
|
use crate::rpc::args::Take;
|
|
|
|
use crate::rpc::paths::{ID, METHOD, PARAMS};
|
2022-10-19 22:54:41 +00:00
|
|
|
use crate::rpc::res;
|
2022-07-04 01:03:26 +00:00
|
|
|
use crate::rpc::res::Failure;
|
2022-10-25 13:19:44 +00:00
|
|
|
use crate::rpc::res::Output;
|
2022-07-04 01:03:26 +00:00
|
|
|
use futures::{SinkExt, StreamExt};
|
2023-02-13 12:20:13 +00:00
|
|
|
use once_cell::sync::Lazy;
|
2022-07-04 01:03:26 +00:00
|
|
|
use std::collections::BTreeMap;
|
2023-02-13 12:20:13 +00:00
|
|
|
use std::collections::HashMap;
|
2022-07-07 10:25:22 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
use surrealdb::channel;
|
|
|
|
use surrealdb::channel::Sender;
|
2023-06-20 22:50:26 +00:00
|
|
|
use surrealdb::dbs::{QueryType, Response, Session};
|
2023-06-09 13:45:07 +00:00
|
|
|
use surrealdb::opt::auth::Root;
|
2022-10-19 17:57:03 +00:00
|
|
|
use surrealdb::sql::Array;
|
2022-07-04 01:03:26 +00:00
|
|
|
use surrealdb::sql::Object;
|
|
|
|
use surrealdb::sql::Strand;
|
|
|
|
use surrealdb::sql::Value;
|
2022-07-07 10:25:22 +00:00
|
|
|
use tokio::sync::RwLock;
|
2023-03-29 18:16:18 +00:00
|
|
|
use tracing::instrument;
|
2023-05-31 22:40:24 +00:00
|
|
|
use uuid::Uuid;
|
2022-07-04 01:03:26 +00:00
|
|
|
use warp::ws::{Message, WebSocket, Ws};
|
|
|
|
use warp::Filter;
|
|
|
|
|
2023-02-13 12:20:13 +00:00
|
|
|
type WebSockets = RwLock<HashMap<Uuid, Sender<Message>>>;
|
2023-06-20 22:50:26 +00:00
|
|
|
// Mapping of LiveQueryID to WebSocketID
|
|
|
|
type LiveQueries = RwLock<HashMap<Uuid, Uuid>>;
|
2023-02-13 12:20:13 +00:00
|
|
|
|
2023-02-13 16:05:02 +00:00
|
|
|
static WEBSOCKETS: Lazy<WebSockets> = Lazy::new(WebSockets::default);
|
2023-06-20 22:50:26 +00:00
|
|
|
static LIVE_QUERIES: Lazy<LiveQueries> = Lazy::new(LiveQueries::default);
|
2023-02-13 12:20:13 +00:00
|
|
|
|
2022-12-20 10:30:40 +00:00
|
|
|
#[allow(opaque_hidden_inferred_bound)]
|
2022-07-04 01:03:26 +00:00
|
|
|
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
|
|
|
|
warp::path("rpc")
|
|
|
|
.and(warp::path::end())
|
|
|
|
.and(warp::ws())
|
|
|
|
.and(session::build())
|
|
|
|
.map(|ws: Ws, session: Session| ws.on_upgrade(move |ws| socket(ws, session)))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn socket(ws: WebSocket, session: Session) {
|
2022-07-07 10:25:22 +00:00
|
|
|
let rpc = Rpc::new(session);
|
|
|
|
Rpc::serve(rpc, ws).await
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Rpc {
|
|
|
|
session: Session,
|
2022-10-25 13:19:44 +00:00
|
|
|
format: Output,
|
2023-02-13 12:20:13 +00:00
|
|
|
uuid: Uuid,
|
2022-07-04 01:03:26 +00:00
|
|
|
vars: BTreeMap<String, Value>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Rpc {
|
2022-11-23 09:42:59 +00:00
|
|
|
/// Instantiate a new RPC
|
2022-07-07 10:25:22 +00:00
|
|
|
pub fn new(mut session: Session) -> Arc<RwLock<Rpc>> {
|
2022-07-04 01:03:26 +00:00
|
|
|
// Create a new RPC variables store
|
|
|
|
let vars = BTreeMap::new();
|
2022-10-25 13:19:44 +00:00
|
|
|
// Set the default output format
|
|
|
|
let format = Output::Json;
|
2023-02-13 12:20:13 +00:00
|
|
|
// Create a unique WebSocket id
|
|
|
|
let uuid = Uuid::new_v4();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Enable real-time live queries
|
|
|
|
session.rt = true;
|
|
|
|
// Create and store the Rpc connection
|
2022-07-07 10:25:22 +00:00
|
|
|
Arc::new(RwLock::new(Rpc {
|
2022-07-04 01:03:26 +00:00
|
|
|
session,
|
2022-10-25 13:19:44 +00:00
|
|
|
format,
|
2023-02-13 12:20:13 +00:00
|
|
|
uuid,
|
2022-07-04 01:03:26 +00:00
|
|
|
vars,
|
2022-07-07 10:25:22 +00:00
|
|
|
}))
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
2022-11-23 09:42:59 +00:00
|
|
|
/// Serve the RPC endpoint
|
2022-07-07 10:25:22 +00:00
|
|
|
pub async fn serve(rpc: Arc<RwLock<Rpc>>, ws: WebSocket) {
|
|
|
|
// Create a channel for sending messages
|
|
|
|
let (chn, mut rcv) = channel::new(MAX_CONCURRENT_CALLS);
|
|
|
|
// Split the socket into send and recv
|
|
|
|
let (mut wtx, mut wrx) = ws.split();
|
2022-10-19 13:56:43 +00:00
|
|
|
// Clone the channel for sending pings
|
|
|
|
let png = chn.clone();
|
2023-02-13 12:20:13 +00:00
|
|
|
// The WebSocket has connected
|
|
|
|
Rpc::connected(rpc.clone(), chn.clone()).await;
|
2022-10-19 13:56:43 +00:00
|
|
|
// Send messages to the client
|
|
|
|
tokio::task::spawn(async move {
|
|
|
|
// Create the interval ticker
|
|
|
|
let mut interval = tokio::time::interval(WEBSOCKET_PING_FREQUENCY);
|
|
|
|
// Loop indefinitely
|
|
|
|
loop {
|
|
|
|
// Wait for the timer
|
|
|
|
interval.tick().await;
|
|
|
|
// Create the ping message
|
|
|
|
let msg = Message::ping(vec![]);
|
|
|
|
// Send the message to the client
|
2022-10-19 14:35:42 +00:00
|
|
|
if png.send(msg).await.is_err() {
|
2022-10-19 13:56:43 +00:00
|
|
|
// Exit out of the loop
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
2022-07-07 10:25:22 +00:00
|
|
|
// Send messages to the client
|
|
|
|
tokio::task::spawn(async move {
|
2022-09-20 22:04:40 +00:00
|
|
|
// Wait for the next message to send
|
2022-07-07 10:25:22 +00:00
|
|
|
while let Some(res) = rcv.next().await {
|
2022-09-20 22:04:40 +00:00
|
|
|
// Send the message to the client
|
|
|
|
if let Err(err) = wtx.send(res).await {
|
|
|
|
// Output the WebSocket error to the logs
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("WebSocket error: {:?}", err);
|
2022-09-20 22:04:40 +00:00
|
|
|
// It's already failed, so ignore error
|
|
|
|
let _ = wtx.close().await;
|
|
|
|
// Exit out of the loop
|
|
|
|
break;
|
|
|
|
}
|
2022-07-07 10:25:22 +00:00
|
|
|
}
|
|
|
|
});
|
2023-06-20 22:50:26 +00:00
|
|
|
// Send notifications to the client
|
|
|
|
let moved_rpc = rpc.clone();
|
|
|
|
tokio::task::spawn(async move {
|
|
|
|
let rpc = moved_rpc;
|
2023-07-05 21:26:13 +00:00
|
|
|
if let Some(channel) = DB.get().unwrap().notifications() {
|
|
|
|
while let Ok(v) = channel.recv().await {
|
|
|
|
trace!("Received notification: {:?}", v);
|
|
|
|
// Find which websocket the notification belongs to
|
|
|
|
match LIVE_QUERIES.read().await.get(&v.id) {
|
|
|
|
Some(ws_id) => {
|
|
|
|
// Send the notification to the client
|
|
|
|
let msg_text = res::success(None, v.clone());
|
|
|
|
let ws_write = WEBSOCKETS.write().await;
|
|
|
|
match ws_write.get(ws_id) {
|
|
|
|
None => {
|
|
|
|
error!(
|
|
|
|
"Tracked WebSocket {:?} not found for lq: {:?}",
|
|
|
|
ws_id, &v.id
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Some(ws_sender) => {
|
|
|
|
msg_text
|
|
|
|
.send(rpc.read().await.format.clone(), ws_sender.clone())
|
|
|
|
.await;
|
|
|
|
trace!(
|
|
|
|
"Sent notification to WebSocket {:?} for lq: {:?}",
|
|
|
|
ws_id,
|
|
|
|
&v.id
|
|
|
|
);
|
|
|
|
}
|
2023-06-20 22:50:26 +00:00
|
|
|
}
|
|
|
|
}
|
2023-07-05 21:26:13 +00:00
|
|
|
None => {
|
|
|
|
error!("Unknown websocket for live query: {:?}", v.id);
|
|
|
|
}
|
2023-06-20 22:50:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
2022-07-07 10:25:22 +00:00
|
|
|
// Get messages from the client
|
|
|
|
while let Some(msg) = wrx.next().await {
|
2022-09-20 22:04:40 +00:00
|
|
|
match msg {
|
|
|
|
// We've received a message from the client
|
2022-10-19 22:57:05 +00:00
|
|
|
Ok(msg) => match msg {
|
|
|
|
msg if msg.is_ping() => {
|
2023-02-03 11:47:07 +00:00
|
|
|
let _ = chn.send(Message::pong(vec![])).await;
|
2022-10-19 20:11:15 +00:00
|
|
|
}
|
2022-10-19 22:57:05 +00:00
|
|
|
msg if msg.is_text() => {
|
2022-09-20 22:04:40 +00:00
|
|
|
tokio::task::spawn(Rpc::call(rpc.clone(), msg, chn.clone()));
|
|
|
|
}
|
2022-10-25 13:38:23 +00:00
|
|
|
msg if msg.is_binary() => {
|
|
|
|
tokio::task::spawn(Rpc::call(rpc.clone(), msg, chn.clone()));
|
|
|
|
}
|
2022-10-19 22:57:05 +00:00
|
|
|
msg if msg.is_close() => {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
msg if msg.is_pong() => {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
// Ignore everything else
|
|
|
|
}
|
|
|
|
},
|
2022-09-20 22:04:40 +00:00
|
|
|
// There was an error receiving the message
|
|
|
|
Err(err) => {
|
|
|
|
// Output the WebSocket error to the logs
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("WebSocket error: {:?}", err);
|
2022-09-20 22:04:40 +00:00
|
|
|
// Exit out of the loop
|
|
|
|
break;
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-02-13 12:20:13 +00:00
|
|
|
// The WebSocket has disconnected
|
|
|
|
Rpc::disconnected(rpc.clone()).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn connected(rpc: Arc<RwLock<Rpc>>, chn: Sender<Message>) {
|
|
|
|
// Fetch the unique id of the WebSocket
|
2023-06-19 22:35:19 +00:00
|
|
|
let id = rpc.read().await.uuid;
|
2023-02-13 12:20:13 +00:00
|
|
|
// Log that the WebSocket has connected
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("WebSocket {} connected", id);
|
2023-02-13 12:20:13 +00:00
|
|
|
// Store this WebSocket in the list of WebSockets
|
|
|
|
WEBSOCKETS.write().await.insert(id, chn);
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn disconnected(rpc: Arc<RwLock<Rpc>>) {
|
|
|
|
// Fetch the unique id of the WebSocket
|
2023-06-19 22:35:19 +00:00
|
|
|
let id = rpc.read().await.uuid;
|
2023-02-13 12:20:13 +00:00
|
|
|
// Log that the WebSocket has disconnected
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("WebSocket {} disconnected", id);
|
2023-02-13 12:20:13 +00:00
|
|
|
// Remove this WebSocket from the list of WebSockets
|
|
|
|
WEBSOCKETS.write().await.remove(&id);
|
2023-06-20 22:50:26 +00:00
|
|
|
// Remove all live queries
|
|
|
|
let mut locked_lq_map = LIVE_QUERIES.write().await;
|
|
|
|
let mut live_query_to_gc: Vec<Uuid> = vec![];
|
|
|
|
for (key, value) in locked_lq_map.iter() {
|
|
|
|
if value == &id {
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("Removing live query: {}", key);
|
2023-06-20 22:50:26 +00:00
|
|
|
live_query_to_gc.push(*key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for key in live_query_to_gc {
|
|
|
|
locked_lq_map.remove(&key);
|
|
|
|
}
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
2022-11-23 09:42:59 +00:00
|
|
|
/// Call RPC methods from the WebSocket
|
2022-07-07 10:25:22 +00:00
|
|
|
async fn call(rpc: Arc<RwLock<Rpc>>, msg: Message, chn: Sender<Message>) {
|
2022-10-25 13:19:44 +00:00
|
|
|
// Get the current output format
|
2022-10-25 13:38:23 +00:00
|
|
|
let mut out = { rpc.read().await.format.clone() };
|
2022-07-07 10:25:22 +00:00
|
|
|
// Clone the RPC
|
|
|
|
let rpc = rpc.clone();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Parse the request
|
2022-10-25 13:19:44 +00:00
|
|
|
let req = match msg {
|
2022-10-25 13:38:23 +00:00
|
|
|
// This is a binary message
|
|
|
|
m if m.is_binary() => {
|
|
|
|
// Use binary output
|
|
|
|
out = Output::Full;
|
|
|
|
// Deserialize the input
|
|
|
|
Value::from(m.into_bytes())
|
|
|
|
}
|
2022-10-25 13:19:44 +00:00
|
|
|
// This is a text message
|
|
|
|
m if m.is_text() => {
|
|
|
|
// This won't panic due to the check above
|
|
|
|
let val = m.to_str().unwrap();
|
|
|
|
// Parse the SurrealQL object
|
2023-04-17 14:39:37 +00:00
|
|
|
match surrealdb::sql::value(val) {
|
2022-10-25 13:19:44 +00:00
|
|
|
// The SurrealQL message parsed ok
|
|
|
|
Ok(v) => v,
|
|
|
|
// The SurrealQL message failed to parse
|
|
|
|
_ => return res::failure(None, Failure::PARSE_ERROR).send(out, chn).await,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Unsupported message type
|
|
|
|
_ => return res::failure(None, Failure::INTERNAL_ERROR).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
};
|
2023-03-04 10:53:10 +00:00
|
|
|
// Log the received request
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("RPC Received: {}", req);
|
2022-07-04 01:03:26 +00:00
|
|
|
// Fetch the 'id' argument
|
|
|
|
let id = match req.pick(&*ID) {
|
2022-10-25 13:06:02 +00:00
|
|
|
v if v.is_none() => None,
|
|
|
|
v if v.is_null() => Some(v),
|
|
|
|
v if v.is_uuid() => Some(v),
|
|
|
|
v if v.is_number() => Some(v),
|
|
|
|
v if v.is_strand() => Some(v),
|
|
|
|
v if v.is_datetime() => Some(v),
|
|
|
|
_ => return res::failure(None, Failure::INVALID_REQUEST).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
};
|
|
|
|
// Fetch the 'method' argument
|
|
|
|
let method = match req.pick(&*METHOD) {
|
|
|
|
Value::Strand(v) => v.to_raw(),
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_REQUEST).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
};
|
|
|
|
// Fetch the 'params' argument
|
|
|
|
let params = match req.pick(&*PARAMS) {
|
|
|
|
Value::Array(v) => v,
|
2022-10-19 17:57:03 +00:00
|
|
|
_ => Array::new(),
|
2022-07-04 01:03:26 +00:00
|
|
|
};
|
|
|
|
// Match the method to a function
|
|
|
|
let res = match &method[..] {
|
2022-10-25 13:22:06 +00:00
|
|
|
// Handle a ping message
|
2022-10-19 22:54:41 +00:00
|
|
|
"ping" => Ok(Value::None),
|
2022-10-25 13:22:06 +00:00
|
|
|
// Retrieve the current auth record
|
2022-07-04 01:03:26 +00:00
|
|
|
"info" => match params.len() {
|
2022-07-07 10:25:22 +00:00
|
|
|
0 => rpc.read().await.info().await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Switch to a specific namespace and database
|
2022-10-25 13:31:14 +00:00
|
|
|
"use" => match params.needs_two() {
|
2023-03-07 09:55:35 +00:00
|
|
|
Ok((ns, db)) => rpc.write().await.yuse(ns, db).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Signup to a specific authentication scope
|
2022-10-25 13:31:14 +00:00
|
|
|
"signup" => match params.needs_one() {
|
|
|
|
Ok(Value::Object(v)) => rpc.write().await.signup(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Signin as a root, namespace, database or scope user
|
2022-10-25 13:31:14 +00:00
|
|
|
"signin" => match params.needs_one() {
|
|
|
|
Ok(Value::Object(v)) => rpc.write().await.signin(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Invalidate the current authentication session
|
2022-07-04 01:03:26 +00:00
|
|
|
"invalidate" => match params.len() {
|
2022-07-07 10:25:22 +00:00
|
|
|
0 => rpc.write().await.invalidate().await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Authenticate using an authentication token
|
2022-10-25 13:31:14 +00:00
|
|
|
"authenticate" => match params.needs_one() {
|
|
|
|
Ok(Value::Strand(v)) => rpc.write().await.authenticate(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Kill a live query using a query id
|
2022-10-25 13:31:14 +00:00
|
|
|
"kill" => match params.needs_one() {
|
|
|
|
Ok(v) if v.is_uuid() => rpc.read().await.kill(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Setup a live query on a specific table
|
2022-10-25 13:31:14 +00:00
|
|
|
"live" => match params.needs_one() {
|
2023-02-14 11:01:31 +00:00
|
|
|
Ok(v) if v.is_table() => rpc.read().await.live(v).await,
|
2022-10-25 13:31:14 +00:00
|
|
|
Ok(v) if v.is_strand() => rpc.read().await.live(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Specify a connection-wide parameter
|
2022-10-25 13:31:14 +00:00
|
|
|
"let" => match params.needs_one_or_two() {
|
|
|
|
Ok((Value::Strand(s), v)) => rpc.write().await.set(s, v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-08 20:55:44 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Specify a connection-wide parameter
|
2022-10-25 13:31:14 +00:00
|
|
|
"set" => match params.needs_one_or_two() {
|
|
|
|
Ok((Value::Strand(s), v)) => rpc.write().await.set(s, v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:23:18 +00:00
|
|
|
// Unset and clear a connection-wide parameter
|
|
|
|
"unset" => match params.needs_one() {
|
|
|
|
Ok(Value::Strand(s)) => rpc.write().await.unset(s).await,
|
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Select a value or values from the database
|
2022-10-25 13:31:14 +00:00
|
|
|
"select" => match params.needs_one() {
|
|
|
|
Ok(v) => rpc.read().await.select(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Create a value or values in the database
|
2022-10-25 13:31:14 +00:00
|
|
|
"create" => match params.needs_one_or_two() {
|
|
|
|
Ok((v, o)) => rpc.read().await.create(v, o).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Update a value or values in the database using `CONTENT`
|
2022-10-25 13:31:14 +00:00
|
|
|
"update" => match params.needs_one_or_two() {
|
|
|
|
Ok((v, o)) => rpc.read().await.update(v, o).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Update a value or values in the database using `MERGE`
|
2022-10-25 13:31:14 +00:00
|
|
|
"change" | "merge" => match params.needs_one_or_two() {
|
|
|
|
Ok((v, o)) => rpc.read().await.change(v, o).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Update a value or values in the database using `PATCH`
|
2022-10-25 13:31:14 +00:00
|
|
|
"modify" | "patch" => match params.needs_one_or_two() {
|
|
|
|
Ok((v, o)) => rpc.read().await.modify(v, o).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-30 01:30:30 +00:00
|
|
|
// Delete a value or values from the database
|
2022-10-25 13:31:14 +00:00
|
|
|
"delete" => match params.needs_one() {
|
|
|
|
Ok(v) => rpc.read().await.delete(v).await,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
|
|
|
},
|
|
|
|
// Specify the output format for text requests
|
|
|
|
"format" => match params.needs_one() {
|
|
|
|
Ok(Value::Strand(v)) => rpc.write().await.format(v).await,
|
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
},
|
2022-10-25 13:22:06 +00:00
|
|
|
// Get the current server version
|
2022-10-12 18:58:43 +00:00
|
|
|
"version" => match params.len() {
|
2022-12-18 16:00:36 +00:00
|
|
|
0 => Ok(format!("{PKG_NAME}-{}", *PKG_VERSION).into()),
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
2022-10-12 18:58:43 +00:00
|
|
|
},
|
2022-10-25 13:38:51 +00:00
|
|
|
// Run a full SurrealQL query against the database
|
|
|
|
"query" => match params.needs_one_or_two() {
|
2023-01-20 00:54:09 +00:00
|
|
|
Ok((Value::Strand(s), o)) if o.is_none_or_null() => {
|
2022-10-25 13:38:51 +00:00
|
|
|
return match rpc.read().await.query(s).await {
|
|
|
|
Ok(v) => res::success(id, v).send(out, chn).await,
|
|
|
|
Err(e) => {
|
|
|
|
res::failure(id, Failure::custom(e.to_string())).send(out, chn).await
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
Ok((Value::Strand(s), Value::Object(o))) => {
|
|
|
|
return match rpc.read().await.query_with(s, o).await {
|
|
|
|
Ok(v) => res::success(id, v).send(out, chn).await,
|
|
|
|
Err(e) => {
|
|
|
|
res::failure(id, Failure::custom(e.to_string())).send(out, chn).await
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
_ => return res::failure(id, Failure::INVALID_PARAMS).send(out, chn).await,
|
|
|
|
},
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return res::failure(id, Failure::METHOD_NOT_FOUND).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
};
|
|
|
|
// Return the final response
|
|
|
|
match res {
|
2022-10-25 13:19:44 +00:00
|
|
|
Ok(v) => res::success(id, v).send(out, chn).await,
|
|
|
|
Err(e) => res::failure(id, Failure::custom(e.to_string())).send(out, chn).await,
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for authentication
|
|
|
|
// ------------------------------
|
|
|
|
|
2022-10-25 13:19:44 +00:00
|
|
|
async fn format(&mut self, out: Strand) -> Result<Value, Error> {
|
|
|
|
match out.as_str() {
|
|
|
|
"json" | "application/json" => self.format = Output::Json,
|
|
|
|
"cbor" | "application/cbor" => self.format = Output::Cbor,
|
2023-03-27 12:52:28 +00:00
|
|
|
"pack" | "application/pack" => self.format = Output::Pack,
|
2022-10-25 13:19:44 +00:00
|
|
|
_ => return Err(Error::InvalidType),
|
|
|
|
};
|
|
|
|
Ok(Value::None)
|
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc use", fields(websocket=self.uuid.to_string()))]
|
2023-03-07 09:55:35 +00:00
|
|
|
async fn yuse(&mut self, ns: Value, db: Value) -> Result<Value, Error> {
|
|
|
|
if let Value::Strand(ns) = ns {
|
|
|
|
self.session.ns = Some(ns.0);
|
|
|
|
}
|
|
|
|
if let Value::Strand(db) = db {
|
|
|
|
self.session.db = Some(db.0);
|
|
|
|
}
|
2022-07-04 01:03:26 +00:00
|
|
|
Ok(Value::None)
|
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc signup", fields(websocket=self.uuid.to_string()))]
|
2022-08-23 22:44:13 +00:00
|
|
|
async fn signup(&mut self, vars: Object) -> Result<Value, Error> {
|
2023-06-09 13:45:07 +00:00
|
|
|
let kvs = DB.get().unwrap();
|
2023-07-05 21:26:13 +00:00
|
|
|
surrealdb::iam::signup::signup(kvs, &mut self.session, vars)
|
2023-02-11 15:56:14 +00:00
|
|
|
.await
|
|
|
|
.map(Into::into)
|
|
|
|
.map_err(Into::into)
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc signin", fields(websocket=self.uuid.to_string()))]
|
2022-08-23 22:44:13 +00:00
|
|
|
async fn signin(&mut self, vars: Object) -> Result<Value, Error> {
|
2023-06-09 13:45:07 +00:00
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
let opts = CF.get().unwrap();
|
|
|
|
let root = opts.pass.as_ref().map(|pass| Root {
|
|
|
|
username: &opts.user,
|
|
|
|
password: pass,
|
|
|
|
});
|
2023-07-05 21:26:13 +00:00
|
|
|
surrealdb::iam::signin::signin(kvs, &root, &mut self.session, vars)
|
2023-02-11 15:56:14 +00:00
|
|
|
.await
|
|
|
|
.map(Into::into)
|
|
|
|
.map_err(Into::into)
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc invalidate", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn invalidate(&mut self) -> Result<Value, Error> {
|
2023-06-09 13:45:07 +00:00
|
|
|
surrealdb::iam::clear::clear(&mut self.session)?;
|
2022-07-04 01:03:26 +00:00
|
|
|
Ok(Value::None)
|
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc auth", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn authenticate(&mut self, token: Strand) -> Result<Value, Error> {
|
2023-06-09 13:45:07 +00:00
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
surrealdb::iam::verify::token(kvs, &mut self.session, token.0).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
Ok(Value::None)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for identification
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc info", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn info(&self) -> Result<Value, Error> {
|
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "SELECT * FROM $auth";
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, None).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first value from the result
|
|
|
|
let res = res.remove(0).result?.first();
|
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
2022-07-07 10:25:22 +00:00
|
|
|
// ------------------------------
|
|
|
|
// Methods for setting variables
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc set", fields(websocket=self.uuid.to_string()))]
|
2022-07-07 10:25:22 +00:00
|
|
|
async fn set(&mut self, key: Strand, val: Value) -> Result<Value, Error> {
|
|
|
|
match val {
|
2022-10-25 13:23:18 +00:00
|
|
|
// Remove the variable if undefined
|
|
|
|
Value::None => self.vars.remove(&key.0),
|
|
|
|
// Store the variable if defined
|
|
|
|
v => self.vars.insert(key.0, v),
|
|
|
|
};
|
|
|
|
Ok(Value::Null)
|
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc unset", fields(websocket=self.uuid.to_string()))]
|
2022-10-25 13:23:18 +00:00
|
|
|
async fn unset(&mut self, key: Strand) -> Result<Value, Error> {
|
|
|
|
self.vars.remove(&key.0);
|
|
|
|
Ok(Value::Null)
|
2022-07-07 10:25:22 +00:00
|
|
|
}
|
|
|
|
|
2022-07-04 01:03:26 +00:00
|
|
|
// ------------------------------
|
|
|
|
// Methods for live queries
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc kill", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn kill(&self, id: Value) -> Result<Value, Error> {
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "KILL $id";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2023-06-20 22:50:26 +00:00
|
|
|
let var = map! {
|
2022-07-04 01:03:26 +00:00
|
|
|
String::from("id") => id,
|
|
|
|
=> &self.vars
|
2023-06-20 22:50:26 +00:00
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Execute the query on the database
|
2023-06-20 22:50:26 +00:00
|
|
|
let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2023-06-20 22:50:26 +00:00
|
|
|
let response = res.remove(0);
|
|
|
|
match response.result {
|
|
|
|
Ok(v) => Ok(v),
|
|
|
|
Err(e) => Err(Error::from(e)),
|
|
|
|
}
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc live", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn live(&self, tb: Value) -> Result<Value, Error> {
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "LIVE SELECT * FROM $tb";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2023-06-20 22:50:26 +00:00
|
|
|
let var = map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("tb") => tb.could_be_table(),
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
2023-06-20 22:50:26 +00:00
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Execute the query on the database
|
2023-06-20 22:50:26 +00:00
|
|
|
let mut res = self.query_with(Strand::from(sql), Object::from(var)).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2023-06-20 22:50:26 +00:00
|
|
|
let response = res.remove(0);
|
|
|
|
match response.result {
|
|
|
|
Ok(v) => Ok(v),
|
|
|
|
Err(e) => Err(Error::from(e)),
|
|
|
|
}
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for selecting
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc select", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn select(&self, what: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "SELECT * FROM $what";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for creating
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc create", fields(websocket=self.uuid.to_string()))]
|
2022-10-25 13:31:14 +00:00
|
|
|
async fn create(&self, what: Value, data: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "CREATE $what CONTENT $data RETURN AFTER";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-10-25 13:31:14 +00:00
|
|
|
String::from("data") => data,
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for updating
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc update", fields(websocket=self.uuid.to_string()))]
|
2022-10-25 13:31:14 +00:00
|
|
|
async fn update(&self, what: Value, data: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "UPDATE $what CONTENT $data RETURN AFTER";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-10-25 13:31:14 +00:00
|
|
|
String::from("data") => data,
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for changing
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc change", fields(websocket=self.uuid.to_string()))]
|
2022-10-25 13:31:14 +00:00
|
|
|
async fn change(&self, what: Value, data: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
|
|
|
let sql = "UPDATE $what MERGE $data RETURN AFTER";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-10-25 13:31:14 +00:00
|
|
|
String::from("data") => data,
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for modifying
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc modify", fields(websocket=self.uuid.to_string()))]
|
2022-10-25 13:31:14 +00:00
|
|
|
async fn modify(&self, what: Value, data: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
2022-07-07 10:03:56 +00:00
|
|
|
let sql = "UPDATE $what PATCH $data RETURN DIFF";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-10-25 13:31:14 +00:00
|
|
|
String::from("data") => data,
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------------------
|
|
|
|
// Methods for deleting
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc delete", fields(websocket=self.uuid.to_string()))]
|
2022-07-04 01:03:26 +00:00
|
|
|
async fn delete(&self, what: Value) -> Result<Value, Error> {
|
2022-10-25 13:35:02 +00:00
|
|
|
// Return a single result?
|
|
|
|
let one = what.is_thing();
|
2022-07-04 01:03:26 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the SQL query string
|
2023-03-31 22:49:29 +00:00
|
|
|
let sql = "DELETE $what RETURN BEFORE";
|
2022-08-21 12:13:38 +00:00
|
|
|
// Specify the query parameters
|
2022-07-04 01:03:26 +00:00
|
|
|
let var = Some(map! {
|
2022-10-25 13:08:09 +00:00
|
|
|
String::from("what") => what.could_be_table(),
|
2022-07-04 01:03:26 +00:00
|
|
|
=> &self.vars
|
|
|
|
});
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let mut res = kvs.execute(sql, &self.session, var).await?;
|
2022-07-04 01:03:26 +00:00
|
|
|
// Extract the first query result
|
2022-10-25 13:35:02 +00:00
|
|
|
let res = match one {
|
|
|
|
true => res.remove(0).result?.first(),
|
|
|
|
false => res.remove(0).result?,
|
|
|
|
};
|
2022-07-04 01:03:26 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
2022-10-25 13:38:51 +00:00
|
|
|
|
2023-06-20 22:50:26 +00:00
|
|
|
async fn handle_live_query_results(&self, res: &Response) {
|
|
|
|
match &res.query_type {
|
|
|
|
QueryType::Live => {
|
|
|
|
if let Ok(Value::Uuid(lqid)) = &res.result {
|
|
|
|
// Match on Uuid type
|
|
|
|
LIVE_QUERIES.write().await.insert(lqid.0, self.uuid);
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("Registered live query {} on websocket {}", lqid, self.uuid);
|
2023-06-20 22:50:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
QueryType::Kill => {
|
|
|
|
if let Ok(Value::Uuid(lqid)) = &res.result {
|
|
|
|
let ws_id = LIVE_QUERIES.write().await.remove(&lqid.0);
|
|
|
|
if let Some(ws_id) = ws_id {
|
2023-07-04 21:02:10 +00:00
|
|
|
trace!("Unregistered live query {} on websocket {}", lqid, ws_id);
|
2023-06-20 22:50:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-25 13:38:51 +00:00
|
|
|
// ------------------------------
|
|
|
|
// Methods for querying
|
|
|
|
// ------------------------------
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc query", fields(websocket=self.uuid.to_string()))]
|
2023-06-20 22:50:26 +00:00
|
|
|
async fn query(&self, sql: Strand) -> Result<Vec<Response>, Error> {
|
2022-10-25 13:38:51 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the query parameters
|
|
|
|
let var = Some(self.vars.clone());
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let res = kvs.execute(&sql, &self.session, var).await?;
|
2023-06-20 22:50:26 +00:00
|
|
|
// Post-process hooks for web layer
|
|
|
|
for response in &res {
|
|
|
|
self.handle_live_query_results(response).await;
|
|
|
|
}
|
2022-10-25 13:38:51 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
2023-05-31 22:40:24 +00:00
|
|
|
#[instrument(skip_all, name = "rpc query_with", fields(websocket=self.uuid.to_string()))]
|
2023-06-20 22:50:26 +00:00
|
|
|
async fn query_with(&self, sql: Strand, mut vars: Object) -> Result<Vec<Response>, Error> {
|
2022-10-25 13:38:51 +00:00
|
|
|
// Get a database reference
|
|
|
|
let kvs = DB.get().unwrap();
|
|
|
|
// Specify the query parameters
|
|
|
|
let var = Some(mrg! { vars.0, &self.vars });
|
|
|
|
// Execute the query on the database
|
2023-07-05 21:26:13 +00:00
|
|
|
let res = kvs.execute(&sql, &self.session, var).await?;
|
2023-06-20 22:50:26 +00:00
|
|
|
// Post-process hooks for web layer
|
|
|
|
for response in &res {
|
|
|
|
self.handle_live_query_results(response).await;
|
|
|
|
}
|
2022-10-25 13:38:51 +00:00
|
|
|
// Return the result to the client
|
|
|
|
Ok(res)
|
|
|
|
}
|
2022-07-04 01:03:26 +00:00
|
|
|
}
|