2022-07-07 10:25:22 +00:00
use crate ::cnf ::MAX_CONCURRENT_CALLS ;
2022-10-12 18:58:43 +00:00
use crate ::cnf ::PKG_NAME ;
2022-10-27 08:58:08 +00:00
use crate ::cnf ::PKG_VERSION ;
2022-10-19 13:56:43 +00:00
use crate ::cnf ::WEBSOCKET_PING_FREQUENCY ;
2022-07-04 01:03:26 +00:00
use crate ::dbs ::DB ;
use crate ::err ::Error ;
use crate ::rpc ::args ::Take ;
use crate ::rpc ::paths ::{ ID , METHOD , PARAMS } ;
2022-10-19 22:54:41 +00:00
use crate ::rpc ::res ;
2023-08-03 14:59:05 +00:00
use crate ::rpc ::res ::Data ;
2022-07-04 01:03:26 +00:00
use crate ::rpc ::res ::Failure ;
2023-08-03 14:59:05 +00:00
use crate ::rpc ::res ::IntoRpcResponse ;
use crate ::rpc ::res ::OutputFormat ;
use crate ::rpc ::CONN_CLOSED_ERR ;
use crate ::telemetry ::traces ::rpc ::span_for_request ;
2023-07-19 14:35:56 +00:00
use axum ::routing ::get ;
use axum ::Extension ;
use axum ::Router ;
2022-07-04 01:03:26 +00:00
use futures ::{ SinkExt , StreamExt } ;
2023-08-03 14:59:05 +00:00
use futures_util ::stream ::SplitSink ;
use futures_util ::stream ::SplitStream ;
2023-07-19 14:35:56 +00:00
use http_body ::Body as HttpBody ;
2023-02-13 12:20:13 +00:00
use once_cell ::sync ::Lazy ;
2022-07-04 01:03:26 +00:00
use std ::collections ::BTreeMap ;
2023-02-13 12:20:13 +00:00
use std ::collections ::HashMap ;
2022-07-07 10:25:22 +00:00
use std ::sync ::Arc ;
use surrealdb ::channel ;
2023-08-03 14:59:05 +00:00
use surrealdb ::channel ::{ Receiver , Sender } ;
2023-06-20 22:50:26 +00:00
use surrealdb ::dbs ::{ QueryType , Response , Session } ;
2023-08-03 14:59:05 +00:00
use surrealdb ::sql ::serde ::deserialize ;
2022-10-19 17:57:03 +00:00
use surrealdb ::sql ::Array ;
2022-07-04 01:03:26 +00:00
use surrealdb ::sql ::Object ;
use surrealdb ::sql ::Strand ;
use surrealdb ::sql ::Value ;
2022-07-07 10:25:22 +00:00
use tokio ::sync ::RwLock ;
2023-08-03 14:59:05 +00:00
use tokio ::task ::JoinSet ;
use tokio_util ::sync ::CancellationToken ;
use tower_http ::request_id ::RequestId ;
use tracing ::Span ;
2023-05-31 22:40:24 +00:00
use uuid ::Uuid ;
2023-07-19 14:35:56 +00:00
use axum ::{
extract ::ws ::{ Message , WebSocket , WebSocketUpgrade } ,
response ::IntoResponse ,
} ;
2022-07-04 01:03:26 +00:00
2023-07-07 19:05:58 +00:00
// Mapping of WebSocketID to WebSocket
2023-08-03 14:59:05 +00:00
pub ( crate ) struct WebSocketRef ( pub ( crate ) Sender < Message > , pub ( crate ) CancellationToken ) ;
type WebSockets = RwLock < HashMap < Uuid , WebSocketRef > > ;
2023-06-20 22:50:26 +00:00
// Mapping of LiveQueryID to WebSocketID
type LiveQueries = RwLock < HashMap < Uuid , Uuid > > ;
2023-02-13 12:20:13 +00:00
2023-08-03 14:59:05 +00:00
pub ( super ) static WEBSOCKETS : Lazy < WebSockets > = Lazy ::new ( WebSockets ::default ) ;
2023-06-20 22:50:26 +00:00
static LIVE_QUERIES : Lazy < LiveQueries > = Lazy ::new ( LiveQueries ::default ) ;
2023-02-13 12:20:13 +00:00
2023-07-19 14:35:56 +00:00
pub ( super ) fn router < S , B > ( ) -> Router < S , B >
where
B : HttpBody + Send + 'static ,
S : Clone + Send + Sync + 'static ,
{
Router ::new ( ) . route ( " /rpc " , get ( handler ) )
}
2023-08-03 14:59:05 +00:00
async fn handler (
ws : WebSocketUpgrade ,
Extension ( sess ) : Extension < Session > ,
Extension ( req_id ) : Extension < RequestId > ,
) -> impl IntoResponse {
2023-07-19 14:35:56 +00:00
// finalize the upgrade process by returning upgrade callback.
// we can customize the callback by sending additional info such as address.
2023-08-03 14:59:05 +00:00
ws . on_upgrade ( move | socket | handle_socket ( socket , sess , req_id ) )
2022-07-04 01:03:26 +00:00
}
2023-08-03 14:59:05 +00:00
async fn handle_socket ( ws : WebSocket , sess : Session , req_id : RequestId ) {
2023-07-19 14:35:56 +00:00
let rpc = Rpc ::new ( sess ) ;
2023-08-03 14:59:05 +00:00
// If the request ID is a valid UUID and is not already in use, use it as the WebSocket ID
match req_id . header_value ( ) . to_str ( ) . map ( Uuid ::parse_str ) {
Ok ( Ok ( req_id ) ) if ! WEBSOCKETS . read ( ) . await . contains_key ( & req_id ) = > {
rpc . write ( ) . await . ws_id = req_id
}
_ = > ( ) ,
}
Rpc ::serve ( rpc , ws ) . await ;
2022-07-04 01:03:26 +00:00
}
pub struct Rpc {
session : Session ,
2023-08-03 14:59:05 +00:00
format : OutputFormat ,
ws_id : Uuid ,
2022-07-04 01:03:26 +00:00
vars : BTreeMap < String , Value > ,
2023-08-03 14:59:05 +00:00
graceful_shutdown : CancellationToken ,
2022-07-04 01:03:26 +00:00
}
impl Rpc {
2022-11-23 09:42:59 +00:00
/// Instantiate a new RPC
2022-07-07 10:25:22 +00:00
pub fn new ( mut session : Session ) -> Arc < RwLock < Rpc > > {
2022-07-04 01:03:26 +00:00
// Create a new RPC variables store
let vars = BTreeMap ::new ( ) ;
2022-10-25 13:19:44 +00:00
// Set the default output format
2023-08-03 14:59:05 +00:00
let format = OutputFormat ::Json ;
// Enable real-time mode
2022-07-04 01:03:26 +00:00
session . rt = true ;
// Create and store the Rpc connection
2022-07-07 10:25:22 +00:00
Arc ::new ( RwLock ::new ( Rpc {
2022-07-04 01:03:26 +00:00
session ,
2022-10-25 13:19:44 +00:00
format ,
2023-08-03 14:59:05 +00:00
ws_id : Uuid ::new_v4 ( ) ,
2022-07-04 01:03:26 +00:00
vars ,
2023-08-03 14:59:05 +00:00
graceful_shutdown : CancellationToken ::new ( ) ,
2022-07-07 10:25:22 +00:00
} ) )
2022-07-04 01:03:26 +00:00
}
2022-11-23 09:42:59 +00:00
/// Serve the RPC endpoint
2022-07-07 10:25:22 +00:00
pub async fn serve ( rpc : Arc < RwLock < Rpc > > , ws : WebSocket ) {
// Split the socket into send and recv
2023-08-03 14:59:05 +00:00
let ( sender , receiver ) = ws . split ( ) ;
// Create an internal channel between the receiver and the sender
let ( internal_sender , internal_receiver ) = channel ::new ( MAX_CONCURRENT_CALLS ) ;
let ws_id = rpc . read ( ) . await . ws_id ;
// Store this WebSocket in the list of WebSockets
WEBSOCKETS . write ( ) . await . insert (
ws_id ,
WebSocketRef ( internal_sender . clone ( ) , rpc . read ( ) . await . graceful_shutdown . clone ( ) ) ,
) ;
trace! ( " WebSocket {} connected " , ws_id ) ;
// Wait until all tasks finish
tokio ::join! (
Self ::ping ( rpc . clone ( ) , internal_sender . clone ( ) ) ,
Self ::read ( rpc . clone ( ) , receiver , internal_sender . clone ( ) ) ,
Self ::write ( rpc . clone ( ) , sender , internal_receiver . clone ( ) ) ,
Self ::lq_notifications ( rpc . clone ( ) ) ,
) ;
// Remove all live queries
LIVE_QUERIES . write ( ) . await . retain ( | key , value | {
if value = = & ws_id {
trace! ( " Removing live query: {} " , key ) ;
return false ;
2022-10-19 13:56:43 +00:00
}
2023-08-03 14:59:05 +00:00
true
2022-10-19 13:56:43 +00:00
} ) ;
2023-08-03 14:59:05 +00:00
// Remove this WebSocket from the list of WebSockets
WEBSOCKETS . write ( ) . await . remove ( & ws_id ) ;
trace! ( " WebSocket {} disconnected " , ws_id ) ;
}
/// Send Ping messages to the client
async fn ping ( rpc : Arc < RwLock < Rpc > > , internal_sender : Sender < Message > ) {
// Create the interval ticker
let mut interval = tokio ::time ::interval ( WEBSOCKET_PING_FREQUENCY ) ;
let cancel_token = rpc . read ( ) . await . graceful_shutdown . clone ( ) ;
loop {
let is_shutdown = cancel_token . cancelled ( ) ;
tokio ::select! {
_ = interval . tick ( ) = > {
let msg = Message ::Ping ( vec! [ ] ) ;
// Send the message to the client and close the WebSocket connection if it fails
if internal_sender . send ( msg ) . await . is_err ( ) {
rpc . read ( ) . await . graceful_shutdown . cancel ( ) ;
break ;
}
} ,
_ = is_shutdown = > break ,
2022-07-07 10:25:22 +00:00
}
2023-08-03 14:59:05 +00:00
}
}
/// Read messages sent from the client
async fn read (
rpc : Arc < RwLock < Rpc > > ,
mut receiver : SplitStream < WebSocket > ,
internal_sender : Sender < Message > ,
) {
// Collect all spawned tasks so we can wait for them at the end
let mut tasks = JoinSet ::new ( ) ;
let cancel_token = rpc . read ( ) . await . graceful_shutdown . clone ( ) ;
loop {
let is_shutdown = cancel_token . cancelled ( ) ;
tokio ::select! {
msg = receiver . next ( ) = > {
if let Some ( msg ) = msg {
match msg {
// We've received a message from the client
// Ping/Pong is automatically handled by the WebSocket library
Ok ( msg ) = > match msg {
Message ::Text ( _ ) = > {
tasks . spawn ( Rpc ::handle_msg ( rpc . clone ( ) , msg , internal_sender . clone ( ) ) ) ;
}
Message ::Binary ( _ ) = > {
tasks . spawn ( Rpc ::handle_msg ( rpc . clone ( ) , msg , internal_sender . clone ( ) ) ) ;
}
Message ::Close ( _ ) = > {
// Respond with a close message
if let Err ( err ) = internal_sender . send ( Message ::Close ( None ) ) . await {
trace! ( " WebSocket error when replying to the Close frame: {:?} " , err ) ;
} ;
// Start the graceful shutdown of the WebSocket and close the channels
rpc . read ( ) . await . graceful_shutdown . cancel ( ) ;
let _ = internal_sender . close ( ) ;
break ;
}
_ = > {
// Ignore everything else
}
} ,
Err ( err ) = > {
trace! ( " WebSocket error: {:?} " , err ) ;
// Start the graceful shutdown of the WebSocket and close the channels
rpc . read ( ) . await . graceful_shutdown . cancel ( ) ;
let _ = internal_sender . close ( ) ;
// Exit out of the loop
break ;
}
2023-07-05 21:26:13 +00:00
}
2023-06-20 22:50:26 +00:00
}
}
2023-08-03 14:59:05 +00:00
_ = is_shutdown = > break ,
2023-06-20 22:50:26 +00:00
}
2023-08-03 14:59:05 +00:00
}
// Wait for all tasks to finish
while let Some ( res ) = tasks . join_next ( ) . await {
if let Err ( err ) = res {
error! ( " Error while handling RPC message: {} " , err ) ;
2022-07-04 01:03:26 +00:00
}
}
2023-02-13 12:20:13 +00:00
}
2023-08-03 14:59:05 +00:00
/// Write messages to the client
async fn write (
rpc : Arc < RwLock < Rpc > > ,
mut sender : SplitSink < WebSocket , Message > ,
mut internal_receiver : Receiver < Message > ,
) {
let cancel_token = rpc . read ( ) . await . graceful_shutdown . clone ( ) ;
loop {
let is_shutdown = cancel_token . cancelled ( ) ;
tokio ::select! {
// Wait for the next message to send
msg = internal_receiver . next ( ) = > {
if let Some ( res ) = msg {
// Send the message to the client
if let Err ( err ) = sender . send ( res ) . await {
if err . to_string ( ) ! = CONN_CLOSED_ERR {
debug! ( " WebSocket error: {:?} " , err ) ;
}
// Close the WebSocket connection
rpc . read ( ) . await . graceful_shutdown . cancel ( ) ;
// Exit out of the loop
break ;
}
}
} ,
_ = is_shutdown = > break ,
}
}
2023-02-13 12:20:13 +00:00
}
2023-08-03 14:59:05 +00:00
/// Send live query notifications to the client
async fn lq_notifications ( rpc : Arc < RwLock < Rpc > > ) {
if let Some ( channel ) = DB . get ( ) . unwrap ( ) . notifications ( ) {
let cancel_token = rpc . read ( ) . await . graceful_shutdown . clone ( ) ;
loop {
tokio ::select! {
msg = channel . recv ( ) = > {
if let Ok ( notification ) = msg {
// Find which WebSocket the notification belongs to
if let Some ( ws_id ) = LIVE_QUERIES . read ( ) . await . get ( & notification . id ) {
// Check to see if the WebSocket exists
if let Some ( WebSocketRef ( ws , _ ) ) = WEBSOCKETS . read ( ) . await . get ( ws_id ) {
// Serialize the message to send
let message = res ::success ( None , notification ) ;
// Get the current output format
let format = rpc . read ( ) . await . format . clone ( ) ;
// Send the notification to the client
message . send ( format , ws . clone ( ) ) . await
}
}
}
} ,
_ = cancel_token . cancelled ( ) = > break ,
}
2023-06-20 22:50:26 +00:00
}
2023-08-03 14:59:05 +00:00
}
2022-07-04 01:03:26 +00:00
}
2023-08-03 14:59:05 +00:00
/// Handle individual WebSocket messages
async fn handle_msg ( rpc : Arc < RwLock < Rpc > > , msg : Message , chn : Sender < Message > ) {
2022-10-25 13:19:44 +00:00
// Get the current output format
2023-08-03 14:59:05 +00:00
let mut out_fmt = rpc . read ( ) . await . format . clone ( ) ;
let span = span_for_request ( & rpc . read ( ) . await . ws_id ) ;
let _enter = span . enter ( ) ;
2022-07-04 01:03:26 +00:00
// Parse the request
2023-08-03 14:59:05 +00:00
match Self ::parse_request ( msg ) . await {
Ok ( ( id , method , params , _out_fmt ) ) = > {
span . record (
" rpc.jsonrpc.request_id " ,
id . clone ( ) . map ( | v | v . as_string ( ) ) . unwrap_or ( String ::new ( ) ) ,
) ;
if let Some ( _out_fmt ) = _out_fmt {
out_fmt = _out_fmt ;
}
// Process the request
let res = Self ::process_request ( rpc . clone ( ) , & method , params ) . await ;
// Process the response
res . into_response ( id ) . send ( out_fmt , chn ) . await
}
Err ( err ) = > {
// Process the response
res ::failure ( None , err ) . send ( out_fmt , chn ) . await
}
}
}
async fn parse_request (
msg : Message ,
) -> Result < ( Option < Value > , String , Array , Option < OutputFormat > ) , Failure > {
let mut out_fmt = None ;
2022-10-25 13:19:44 +00:00
let req = match msg {
2022-10-25 13:38:23 +00:00
// This is a binary message
2023-07-19 14:35:56 +00:00
Message ::Binary ( val ) = > {
2022-10-25 13:38:23 +00:00
// Use binary output
2023-08-03 14:59:05 +00:00
out_fmt = Some ( OutputFormat ::Full ) ;
match deserialize ( & val ) {
Ok ( v ) = > v ,
Err ( _ ) = > {
debug! ( " Error when trying to deserialize the request " ) ;
return Err ( Failure ::PARSE_ERROR ) ;
}
}
2022-10-25 13:38:23 +00:00
}
2022-10-25 13:19:44 +00:00
// This is a text message
2023-07-19 14:35:56 +00:00
Message ::Text ( ref val ) = > {
2022-10-25 13:19:44 +00:00
// Parse the SurrealQL object
2023-04-17 14:39:37 +00:00
match surrealdb ::sql ::value ( val ) {
2022-10-25 13:19:44 +00:00
// The SurrealQL message parsed ok
Ok ( v ) = > v ,
// The SurrealQL message failed to parse
2023-08-03 14:59:05 +00:00
_ = > return Err ( Failure ::PARSE_ERROR ) ,
2022-10-25 13:19:44 +00:00
}
}
// Unsupported message type
2023-08-03 14:59:05 +00:00
_ = > {
debug! ( " Unsupported message type: {:?} " , msg ) ;
return Err ( res ::Failure ::custom ( " Unsupported message type " ) ) ;
}
2022-07-04 01:03:26 +00:00
} ;
// Fetch the 'id' argument
let id = match req . pick ( & * ID ) {
2022-10-25 13:06:02 +00:00
v if v . is_none ( ) = > None ,
v if v . is_null ( ) = > Some ( v ) ,
v if v . is_uuid ( ) = > Some ( v ) ,
v if v . is_number ( ) = > Some ( v ) ,
v if v . is_strand ( ) = > Some ( v ) ,
v if v . is_datetime ( ) = > Some ( v ) ,
2023-08-03 14:59:05 +00:00
_ = > return Err ( Failure ::INVALID_REQUEST ) ,
2022-07-04 01:03:26 +00:00
} ;
// Fetch the 'method' argument
let method = match req . pick ( & * METHOD ) {
Value ::Strand ( v ) = > v . to_raw ( ) ,
2023-08-03 14:59:05 +00:00
_ = > return Err ( Failure ::INVALID_REQUEST ) ,
2022-07-04 01:03:26 +00:00
} ;
2023-08-03 14:59:05 +00:00
// Now that we know the method, we can update the span
Span ::current ( ) . record ( " rpc.method " , & method ) ;
Span ::current ( ) . record ( " otel.name " , format! ( " surrealdb.rpc/ {} " , method ) ) ;
2022-07-04 01:03:26 +00:00
// Fetch the 'params' argument
let params = match req . pick ( & * PARAMS ) {
Value ::Array ( v ) = > v ,
2022-10-19 17:57:03 +00:00
_ = > Array ::new ( ) ,
2022-07-04 01:03:26 +00:00
} ;
2023-08-03 14:59:05 +00:00
Ok ( ( id , method , params , out_fmt ) )
}
async fn process_request (
rpc : Arc < RwLock < Rpc > > ,
method : & str ,
params : Array ,
) -> Result < Data , Failure > {
info! ( " Process RPC request " ) ;
2022-07-04 01:03:26 +00:00
// Match the method to a function
2023-08-03 14:59:05 +00:00
match method {
// Handle a surrealdb ping message
//
// This is used to keep the WebSocket connection alive in environments where the WebSocket protocol is not enough.
// For example, some browsers will wait for the TCP protocol to timeout before triggering an on_close event. This may take several seconds or even minutes in certain scenarios.
// By sending a ping message every few seconds from the client, we can force a connection check and trigger a an on_close event if the ping can't be sent.
//
" ping " = > Ok ( Value ::None . into ( ) ) ,
2022-10-25 13:22:06 +00:00
// Retrieve the current auth record
2022-07-04 01:03:26 +00:00
" info " = > match params . len ( ) {
2023-08-03 14:59:05 +00:00
0 = > rpc . read ( ) . await . info ( ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Switch to a specific namespace and database
2022-10-25 13:31:14 +00:00
" use " = > match params . needs_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( ns , db ) ) = > {
rpc . write ( ) . await . yuse ( ns , db ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Signup to a specific authentication scope
2022-10-25 13:31:14 +00:00
" signup " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( Value ::Object ( v ) ) = > {
rpc . write ( ) . await . signup ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Signin as a root, namespace, database or scope user
2022-10-25 13:31:14 +00:00
" signin " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( Value ::Object ( v ) ) = > {
rpc . write ( ) . await . signin ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Invalidate the current authentication session
2022-07-04 01:03:26 +00:00
" invalidate " = > match params . len ( ) {
2023-08-03 14:59:05 +00:00
0 = > rpc . write ( ) . await . invalidate ( ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Authenticate using an authentication token
2022-10-25 13:31:14 +00:00
" authenticate " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( Value ::Strand ( v ) ) = > {
rpc . write ( ) . await . authenticate ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Kill a live query using a query id
2022-10-25 13:31:14 +00:00
" kill " = > match params . needs_one ( ) {
2023-08-08 17:15:01 +00:00
Ok ( v ) = > rpc . read ( ) . await . kill ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
2023-08-03 14:59:05 +00:00
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Setup a live query on a specific table
2023-07-19 16:19:19 +00:00
" live " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , d ) ) if v . is_table ( ) = > {
rpc . read ( ) . await . live ( v , d ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
Ok ( ( v , d ) ) if v . is_strand ( ) = > {
rpc . read ( ) . await . live ( v , d ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-08 20:55:44 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Specify a connection-wide parameter
2023-08-03 14:59:05 +00:00
" let " | " set " = > match params . needs_one_or_two ( ) {
Ok ( ( Value ::Strand ( s ) , v ) ) = > {
rpc . write ( ) . await . set ( s , v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:23:18 +00:00
// Unset and clear a connection-wide parameter
" unset " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( Value ::Strand ( s ) ) = > {
rpc . write ( ) . await . unset ( s ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-10-25 13:23:18 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Select a value or values from the database
2022-10-25 13:31:14 +00:00
" select " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( v ) = > rpc . read ( ) . await . select ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2023-07-20 09:19:36 +00:00
// Insert a value or values in the database
" insert " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . insert ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2023-07-20 09:19:36 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Create a value or values in the database
2022-10-25 13:31:14 +00:00
" create " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . create ( v , o ) . await . map ( Into ::i nto ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Update a value or values in the database using `CONTENT`
2022-10-25 13:31:14 +00:00
" update " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . update ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Update a value or values in the database using `MERGE`
2022-10-25 13:31:14 +00:00
" change " | " merge " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . change ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Update a value or values in the database using `PATCH`
2022-10-25 13:31:14 +00:00
" modify " | " patch " = > match params . needs_one_or_two ( ) {
2023-08-03 14:59:05 +00:00
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . modify ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-30 01:30:30 +00:00
// Delete a value or values from the database
2022-10-25 13:31:14 +00:00
" delete " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( v ) = > rpc . read ( ) . await . delete ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-10-25 13:19:44 +00:00
} ,
// Specify the output format for text requests
" format " = > match params . needs_one ( ) {
2023-08-03 14:59:05 +00:00
Ok ( Value ::Strand ( v ) ) = > {
rpc . write ( ) . await . format ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-07-04 01:03:26 +00:00
} ,
2022-10-25 13:22:06 +00:00
// Get the current server version
2022-10-12 18:58:43 +00:00
" version " = > match params . len ( ) {
2022-12-18 16:00:36 +00:00
0 = > Ok ( format! ( " {PKG_NAME} - {} " , * PKG_VERSION ) . into ( ) ) ,
2023-08-03 14:59:05 +00:00
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-10-12 18:58:43 +00:00
} ,
2022-10-25 13:38:51 +00:00
// Run a full SurrealQL query against the database
" query " = > match params . needs_one_or_two ( ) {
2023-01-20 00:54:09 +00:00
Ok ( ( Value ::Strand ( s ) , o ) ) if o . is_none_or_null ( ) = > {
2023-08-03 14:59:05 +00:00
rpc . read ( ) . await . query ( s ) . await . map ( Into ::into ) . map_err ( Into ::into )
2022-10-25 13:38:51 +00:00
}
Ok ( ( Value ::Strand ( s ) , Value ::Object ( o ) ) ) = > {
2023-08-03 14:59:05 +00:00
rpc . read ( ) . await . query_with ( s , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
2022-10-25 13:38:51 +00:00
}
2023-08-03 14:59:05 +00:00
_ = > Err ( Failure ::INVALID_PARAMS ) ,
2022-10-25 13:38:51 +00:00
} ,
2023-08-03 14:59:05 +00:00
_ = > Err ( Failure ::METHOD_NOT_FOUND ) ,
2022-07-04 01:03:26 +00:00
}
}
// ------------------------------
// Methods for authentication
// ------------------------------
2022-10-25 13:19:44 +00:00
async fn format ( & mut self , out : Strand ) -> Result < Value , Error > {
match out . as_str ( ) {
2023-08-03 14:59:05 +00:00
" json " | " application/json " = > self . format = OutputFormat ::Json ,
" cbor " | " application/cbor " = > self . format = OutputFormat ::Cbor ,
" pack " | " application/pack " = > self . format = OutputFormat ::Pack ,
2022-10-25 13:19:44 +00:00
_ = > return Err ( Error ::InvalidType ) ,
} ;
Ok ( Value ::None )
}
2023-03-07 09:55:35 +00:00
async fn yuse ( & mut self , ns : Value , db : Value ) -> Result < Value , Error > {
if let Value ::Strand ( ns ) = ns {
self . session . ns = Some ( ns . 0 ) ;
}
if let Value ::Strand ( db ) = db {
self . session . db = Some ( db . 0 ) ;
}
2022-07-04 01:03:26 +00:00
Ok ( Value ::None )
}
2022-08-23 22:44:13 +00:00
async fn signup ( & mut self , vars : Object ) -> Result < Value , Error > {
2023-06-09 13:45:07 +00:00
let kvs = DB . get ( ) . unwrap ( ) ;
2023-07-05 21:26:13 +00:00
surrealdb ::iam ::signup ::signup ( kvs , & mut self . session , vars )
2023-02-11 15:56:14 +00:00
. await
. map ( Into ::into )
. map_err ( Into ::into )
2022-07-04 01:03:26 +00:00
}
2022-08-23 22:44:13 +00:00
async fn signin ( & mut self , vars : Object ) -> Result < Value , Error > {
2023-06-09 13:45:07 +00:00
let kvs = DB . get ( ) . unwrap ( ) ;
2023-07-29 18:47:25 +00:00
surrealdb ::iam ::signin ::signin ( kvs , & mut self . session , vars )
2023-02-11 15:56:14 +00:00
. await
. map ( Into ::into )
. map_err ( Into ::into )
2022-07-04 01:03:26 +00:00
}
async fn invalidate ( & mut self ) -> Result < Value , Error > {
2023-06-09 13:45:07 +00:00
surrealdb ::iam ::clear ::clear ( & mut self . session ) ? ;
2022-07-04 01:03:26 +00:00
Ok ( Value ::None )
}
async fn authenticate ( & mut self , token : Strand ) -> Result < Value , Error > {
2023-06-09 13:45:07 +00:00
let kvs = DB . get ( ) . unwrap ( ) ;
2023-07-29 18:47:25 +00:00
surrealdb ::iam ::verify ::token ( kvs , & mut self . session , & token . 0 ) . await ? ;
2022-07-04 01:03:26 +00:00
Ok ( Value ::None )
}
// ------------------------------
// Methods for identification
// ------------------------------
async fn info ( & self ) -> Result < Value , Error > {
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " SELECT * FROM $auth " ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , None ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first value from the result
let res = res . remove ( 0 ) . result ? . first ( ) ;
// Return the result to the client
Ok ( res )
}
2022-07-07 10:25:22 +00:00
// ------------------------------
// Methods for setting variables
// ------------------------------
async fn set ( & mut self , key : Strand , val : Value ) -> Result < Value , Error > {
match val {
2022-10-25 13:23:18 +00:00
// Remove the variable if undefined
Value ::None = > self . vars . remove ( & key . 0 ) ,
// Store the variable if defined
v = > self . vars . insert ( key . 0 , v ) ,
} ;
Ok ( Value ::Null )
}
async fn unset ( & mut self , key : Strand ) -> Result < Value , Error > {
self . vars . remove ( & key . 0 ) ;
Ok ( Value ::Null )
2022-07-07 10:25:22 +00:00
}
2022-07-04 01:03:26 +00:00
// ------------------------------
// Methods for live queries
// ------------------------------
async fn kill ( & self , id : Value ) -> Result < Value , Error > {
// Specify the SQL query string
let sql = " KILL $id " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2023-06-20 22:50:26 +00:00
let var = map! {
2023-08-08 17:15:01 +00:00
String ::from ( " id " ) = > id , // NOTE: id can be parameter
2022-07-04 01:03:26 +00:00
= > & self . vars
2023-06-20 22:50:26 +00:00
} ;
2022-07-04 01:03:26 +00:00
// Execute the query on the database
2023-06-20 22:50:26 +00:00
let mut res = self . query_with ( Strand ::from ( sql ) , Object ::from ( var ) ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2023-06-20 22:50:26 +00:00
let response = res . remove ( 0 ) ;
match response . result {
Ok ( v ) = > Ok ( v ) ,
Err ( e ) = > Err ( Error ::from ( e ) ) ,
}
2022-07-04 01:03:26 +00:00
}
2023-07-19 16:19:19 +00:00
async fn live ( & self , tb : Value , diff : Value ) -> Result < Value , Error > {
2022-07-04 01:03:26 +00:00
// Specify the SQL query string
2023-07-19 16:19:19 +00:00
let sql = match diff . is_true ( ) {
true = > " LIVE SELECT DIFF FROM $tb " ,
false = > " LIVE SELECT * FROM $tb " ,
} ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2023-06-20 22:50:26 +00:00
let var = map! {
2022-10-25 13:08:09 +00:00
String ::from ( " tb " ) = > tb . could_be_table ( ) ,
2022-07-04 01:03:26 +00:00
= > & self . vars
2023-06-20 22:50:26 +00:00
} ;
2022-07-04 01:03:26 +00:00
// Execute the query on the database
2023-06-20 22:50:26 +00:00
let mut res = self . query_with ( Strand ::from ( sql ) , Object ::from ( var ) ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2023-06-20 22:50:26 +00:00
let response = res . remove ( 0 ) ;
match response . result {
Ok ( v ) = > Ok ( v ) ,
Err ( e ) = > Err ( Error ::from ( e ) ) ,
}
2022-07-04 01:03:26 +00:00
}
// ------------------------------
// Methods for selecting
// ------------------------------
async fn select ( & self , what : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " SELECT * FROM $what " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
2023-07-20 09:19:36 +00:00
// ------------------------------
// Methods for inserting
// ------------------------------
async fn insert ( & self , what : Value , data : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " INSERT INTO $what $data RETURN AFTER " ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
2022-07-04 01:03:26 +00:00
// ------------------------------
// Methods for creating
// ------------------------------
2022-10-25 13:31:14 +00:00
async fn create ( & self , what : Value , data : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " CREATE $what CONTENT $data RETURN AFTER " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-10-25 13:31:14 +00:00
String ::from ( " data " ) = > data ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for updating
// ------------------------------
2022-10-25 13:31:14 +00:00
async fn update ( & self , what : Value , data : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " UPDATE $what CONTENT $data RETURN AFTER " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-10-25 13:31:14 +00:00
String ::from ( " data " ) = > data ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for changing
// ------------------------------
2022-10-25 13:31:14 +00:00
async fn change ( & self , what : Value , data : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " UPDATE $what MERGE $data RETURN AFTER " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-10-25 13:31:14 +00:00
String ::from ( " data " ) = > data ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for modifying
// ------------------------------
2022-10-25 13:31:14 +00:00
async fn modify ( & self , what : Value , data : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
2022-07-07 10:03:56 +00:00
let sql = " UPDATE $what PATCH $data RETURN DIFF " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-10-25 13:31:14 +00:00
String ::from ( " data " ) = > data ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for deleting
// ------------------------------
async fn delete ( & self , what : Value ) -> Result < Value , Error > {
2022-10-25 13:35:02 +00:00
// Return a single result?
let one = what . is_thing ( ) ;
2022-07-04 01:03:26 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
2023-03-31 22:49:29 +00:00
let sql = " DELETE $what RETURN BEFORE " ;
2022-08-21 12:13:38 +00:00
// Specify the query parameters
2022-07-04 01:03:26 +00:00
let var = Some ( map! {
2022-10-25 13:08:09 +00:00
String ::from ( " what " ) = > what . could_be_table ( ) ,
2022-07-04 01:03:26 +00:00
= > & self . vars
} ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
2022-07-04 01:03:26 +00:00
// Extract the first query result
2022-10-25 13:35:02 +00:00
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
2022-07-04 01:03:26 +00:00
// Return the result to the client
Ok ( res )
}
2022-10-25 13:38:51 +00:00
// ------------------------------
// Methods for querying
// ------------------------------
2023-06-20 22:50:26 +00:00
async fn query ( & self , sql : Strand ) -> Result < Vec < Response > , Error > {
2022-10-25 13:38:51 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the query parameters
let var = Some ( self . vars . clone ( ) ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let res = kvs . execute ( & sql , & self . session , var ) . await ? ;
2023-06-20 22:50:26 +00:00
// Post-process hooks for web layer
for response in & res {
self . handle_live_query_results ( response ) . await ;
}
2022-10-25 13:38:51 +00:00
// Return the result to the client
Ok ( res )
}
2023-06-20 22:50:26 +00:00
async fn query_with ( & self , sql : Strand , mut vars : Object ) -> Result < Vec < Response > , Error > {
2022-10-25 13:38:51 +00:00
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the query parameters
let var = Some ( mrg! { vars . 0 , & self . vars } ) ;
// Execute the query on the database
2023-07-05 21:26:13 +00:00
let res = kvs . execute ( & sql , & self . session , var ) . await ? ;
2023-06-20 22:50:26 +00:00
// Post-process hooks for web layer
for response in & res {
self . handle_live_query_results ( response ) . await ;
}
2022-10-25 13:38:51 +00:00
// Return the result to the client
Ok ( res )
}
2023-07-07 19:05:58 +00:00
// ------------------------------
// Private methods
// ------------------------------
async fn handle_live_query_results ( & self , res : & Response ) {
match & res . query_type {
QueryType ::Live = > {
if let Ok ( Value ::Uuid ( lqid ) ) = & res . result {
// Match on Uuid type
2023-08-03 14:59:05 +00:00
LIVE_QUERIES . write ( ) . await . insert ( lqid . 0 , self . ws_id ) ;
trace! ( " Registered live query {} on websocket {} " , lqid , self . ws_id ) ;
2023-07-07 19:05:58 +00:00
}
}
QueryType ::Kill = > {
if let Ok ( Value ::Uuid ( lqid ) ) = & res . result {
let ws_id = LIVE_QUERIES . write ( ) . await . remove ( & lqid . 0 ) ;
if let Some ( ws_id ) = ws_id {
trace! ( " Unregistered live query {} on websocket {} " , lqid , ws_id ) ;
}
}
}
_ = > { }
}
}
2022-07-04 01:03:26 +00:00
}