2023-11-20 19:08:07 +00:00
use crate ::cnf ::PKG_NAME ;
use crate ::cnf ::PKG_VERSION ;
use crate ::cnf ::{ WEBSOCKET_MAX_CONCURRENT_REQUESTS , WEBSOCKET_PING_FREQUENCY } ;
use crate ::dbs ::DB ;
use crate ::err ::Error ;
use crate ::rpc ::args ::Take ;
2024-01-09 15:27:03 +00:00
use crate ::rpc ::failure ::Failure ;
use crate ::rpc ::format ::Format ;
use crate ::rpc ::response ::{ failure , success , Data , IntoRpcResponse } ;
2023-11-20 19:08:07 +00:00
use crate ::rpc ::{ WebSocketRef , CONN_CLOSED_ERR , LIVE_QUERIES , WEBSOCKETS } ;
use crate ::telemetry ;
use crate ::telemetry ::metrics ::ws ::RequestContext ;
use crate ::telemetry ::traces ::rpc ::span_for_request ;
2023-08-16 12:27:53 +00:00
use axum ::extract ::ws ::{ Message , WebSocket } ;
use futures_util ::stream ::{ SplitSink , SplitStream } ;
use futures_util ::{ SinkExt , StreamExt } ;
use opentelemetry ::trace ::FutureExt ;
use opentelemetry ::Context as TelemetryContext ;
use std ::collections ::BTreeMap ;
use std ::sync ::Arc ;
use surrealdb ::channel ::{ self , Receiver , Sender } ;
2023-11-20 19:08:07 +00:00
use surrealdb ::dbs ::QueryType ;
use surrealdb ::dbs ::Response ;
2023-08-16 12:27:53 +00:00
use surrealdb ::dbs ::Session ;
2023-11-20 19:08:07 +00:00
use surrealdb ::sql ::Array ;
use surrealdb ::sql ::Object ;
use surrealdb ::sql ::Strand ;
use surrealdb ::sql ::Value ;
use tokio ::sync ::{ RwLock , Semaphore } ;
2023-08-16 12:27:53 +00:00
use tokio ::task ::JoinSet ;
use tokio_util ::sync ::CancellationToken ;
2023-11-20 19:08:07 +00:00
use tracing ::Instrument ;
use tracing ::Span ;
2023-08-16 12:27:53 +00:00
use uuid ::Uuid ;
pub struct Connection {
2024-01-09 15:27:03 +00:00
id : Uuid ,
2023-11-20 19:08:07 +00:00
session : Session ,
2024-01-09 15:27:03 +00:00
format : Format ,
2023-11-20 19:08:07 +00:00
vars : BTreeMap < String , Value > ,
limiter : Arc < Semaphore > ,
canceller : CancellationToken ,
2023-08-16 12:27:53 +00:00
}
impl Connection {
/// Instantiate a new RPC
2024-01-09 15:27:03 +00:00
pub fn new ( id : Uuid , mut session : Session , format : Format ) -> Arc < RwLock < Connection > > {
2023-08-16 12:27:53 +00:00
// Enable real-time mode
session . rt = true ;
// Create and store the RPC connection
Arc ::new ( RwLock ::new ( Connection {
2024-01-09 15:27:03 +00:00
id ,
2023-11-20 19:08:07 +00:00
session ,
format ,
2024-01-09 15:27:03 +00:00
vars : BTreeMap ::new ( ) ,
2023-11-20 19:08:07 +00:00
limiter : Arc ::new ( Semaphore ::new ( * WEBSOCKET_MAX_CONCURRENT_REQUESTS ) ) ,
canceller : CancellationToken ::new ( ) ,
2023-08-16 12:27:53 +00:00
} ) )
}
/// Serve the RPC endpoint
pub async fn serve ( rpc : Arc < RwLock < Connection > > , ws : WebSocket ) {
// Split the socket into send and recv
let ( sender , receiver ) = ws . split ( ) ;
// Create an internal channel between the receiver and the sender
2023-11-20 19:08:07 +00:00
let ( internal_sender , internal_receiver ) =
channel ::bounded ( * WEBSOCKET_MAX_CONCURRENT_REQUESTS ) ;
2023-08-16 12:27:53 +00:00
2024-01-09 15:27:03 +00:00
let id = rpc . read ( ) . await . id ;
2023-08-16 12:27:53 +00:00
2024-01-09 15:27:03 +00:00
trace! ( " WebSocket {} connected " , id ) ;
2023-08-16 12:27:53 +00:00
if let Err ( err ) = telemetry ::metrics ::ws ::on_connect ( ) {
error! ( " Error running metrics::ws::on_connect hook: {} " , err ) ;
}
// Add this WebSocket to the list
2024-01-09 15:27:03 +00:00
WEBSOCKETS
. write ( )
. await
. insert ( id , WebSocketRef ( internal_sender . clone ( ) , rpc . read ( ) . await . canceller . clone ( ) ) ) ;
2023-08-19 09:01:37 +00:00
2023-09-12 09:38:28 +00:00
// Spawn async tasks for the WebSocket
2023-08-16 12:27:53 +00:00
let mut tasks = JoinSet ::new ( ) ;
tasks . spawn ( Self ::ping ( rpc . clone ( ) , internal_sender . clone ( ) ) ) ;
tasks . spawn ( Self ::read ( rpc . clone ( ) , receiver , internal_sender . clone ( ) ) ) ;
tasks . spawn ( Self ::write ( rpc . clone ( ) , sender , internal_receiver . clone ( ) ) ) ;
2023-09-12 09:38:28 +00:00
tasks . spawn ( Self ::notifications ( rpc . clone ( ) ) ) ;
2023-08-16 12:27:53 +00:00
// Wait until all tasks finish
while let Some ( res ) = tasks . join_next ( ) . await {
if let Err ( err ) = res {
error! ( " Error handling RPC connection: {} " , err ) ;
}
}
2023-11-20 19:08:07 +00:00
internal_sender . close ( ) ;
2024-01-09 15:27:03 +00:00
trace! ( " WebSocket {} disconnected " , id ) ;
2023-09-12 09:38:28 +00:00
2023-08-16 12:27:53 +00:00
// Remove this WebSocket from the list
2024-01-09 15:27:03 +00:00
WEBSOCKETS . write ( ) . await . remove ( & id ) ;
2023-08-16 12:27:53 +00:00
2023-09-12 09:38:28 +00:00
// Remove all live queries
let mut gc = Vec ::new ( ) ;
LIVE_QUERIES . write ( ) . await . retain ( | key , value | {
2024-01-09 15:27:03 +00:00
if value = = & id {
2023-09-12 09:38:28 +00:00
trace! ( " Removing live query: {} " , key ) ;
gc . push ( * key ) ;
return false ;
}
true
} ) ;
// Garbage collect queries
if let Err ( e ) = DB . get ( ) . unwrap ( ) . garbage_collect_dead_session ( gc . as_slice ( ) ) . await {
error! ( " Failed to garbage collect dead sessions: {:?} " , e ) ;
}
2023-08-16 12:27:53 +00:00
if let Err ( err ) = telemetry ::metrics ::ws ::on_disconnect ( ) {
error! ( " Error running metrics::ws::on_disconnect hook: {} " , err ) ;
}
}
/// Send Ping messages to the client
async fn ping ( rpc : Arc < RwLock < Connection > > , internal_sender : Sender < Message > ) {
// Create the interval ticker
let mut interval = tokio ::time ::interval ( WEBSOCKET_PING_FREQUENCY ) ;
2023-11-20 19:08:07 +00:00
// Clone the WebSocket cancellation token
let canceller = rpc . read ( ) . await . canceller . clone ( ) ;
// Loop, and listen for messages to write
2023-08-16 12:27:53 +00:00
loop {
tokio ::select! {
2023-11-20 19:08:07 +00:00
//
biased ;
// Check if this has shutdown
_ = canceller . cancelled ( ) = > break ,
// Send a regular ping message
2023-08-16 12:27:53 +00:00
_ = interval . tick ( ) = > {
2023-11-20 19:08:07 +00:00
// Create a new ping message
2023-08-16 12:27:53 +00:00
let msg = Message ::Ping ( vec! [ ] ) ;
2023-11-20 19:08:07 +00:00
// Close the connection if the message fails
2023-08-16 12:27:53 +00:00
if internal_sender . send ( msg ) . await . is_err ( ) {
2023-11-20 19:08:07 +00:00
// Cancel the WebSocket tasks
rpc . read ( ) . await . canceller . cancel ( ) ;
// Exit out of the loop
2023-08-16 12:27:53 +00:00
break ;
}
} ,
2023-11-20 19:08:07 +00:00
}
}
}
/// Write messages to the client
async fn write (
rpc : Arc < RwLock < Connection > > ,
mut sender : SplitSink < WebSocket , Message > ,
mut internal_receiver : Receiver < Message > ,
) {
// Clone the WebSocket cancellation token
let canceller = rpc . read ( ) . await . canceller . clone ( ) ;
// Loop, and listen for messages to write
loop {
tokio ::select! {
//
biased ;
// Check if this has shutdown
_ = canceller . cancelled ( ) = > break ,
// Wait for the next message to send
2023-12-05 09:28:29 +00:00
Some ( res ) = internal_receiver . next ( ) = > {
// Send the message to the client
if let Err ( err ) = sender . send ( res ) . await {
// Output any errors if not a close error
if err . to_string ( ) ! = CONN_CLOSED_ERR {
debug! ( " WebSocket error: {:?} " , err ) ;
2023-11-20 19:08:07 +00:00
}
2023-12-05 09:28:29 +00:00
// Cancel the WebSocket tasks
rpc . read ( ) . await . canceller . cancel ( ) ;
// Exit out of the loop
break ;
2023-11-20 19:08:07 +00:00
}
} ,
2023-08-16 12:27:53 +00:00
}
}
}
/// Read messages sent from the client
async fn read (
rpc : Arc < RwLock < Connection > > ,
mut receiver : SplitStream < WebSocket > ,
internal_sender : Sender < Message > ,
) {
2023-11-20 19:08:07 +00:00
// Store spawned tasks so we can wait for them
2023-08-16 12:27:53 +00:00
let mut tasks = JoinSet ::new ( ) ;
2023-11-20 19:08:07 +00:00
// Clone the WebSocket cancellation token
let canceller = rpc . read ( ) . await . canceller . clone ( ) ;
// Loop, and listen for messages to write
2023-08-16 12:27:53 +00:00
loop {
tokio ::select! {
2023-11-20 19:08:07 +00:00
//
biased ;
// Check if this has shutdown
_ = canceller . cancelled ( ) = > break ,
2023-12-05 09:28:29 +00:00
// Remove any completed tasks
Some ( out ) = tasks . join_next ( ) = > match out {
// The task completed successfully
Ok ( _ ) = > continue ,
// There was an uncaught panic in the task
Err ( err ) = > {
// There was an error with the task
trace! ( " WebSocket request error: {:?} " , err ) ;
// Cancel the WebSocket tasks
rpc . read ( ) . await . canceller . cancel ( ) ;
// Exit out of the loop
break ;
}
} ,
// Wait for the next received message
Some ( msg ) = receiver . next ( ) = > match msg {
// We've received a message from the client
Ok ( msg ) = > match msg {
Message ::Text ( _ ) = > {
tasks . spawn ( Connection ::handle_message ( rpc . clone ( ) , msg , internal_sender . clone ( ) ) ) ;
}
Message ::Binary ( _ ) = > {
tasks . spawn ( Connection ::handle_message ( rpc . clone ( ) , msg , internal_sender . clone ( ) ) ) ;
}
Message ::Close ( _ ) = > {
// Respond with a close message
if let Err ( err ) = internal_sender . send ( Message ::Close ( None ) ) . await {
trace! ( " WebSocket error when replying to the Close frame: {:?} " , err ) ;
} ;
// Cancel the WebSocket tasks
rpc . read ( ) . await . canceller . cancel ( ) ;
// Exit out of the loop
break ;
}
_ = > {
// Ignore everything else
2023-08-16 12:27:53 +00:00
}
2023-12-05 09:28:29 +00:00
} ,
Err ( err ) = > {
// There was an error with the WebSocket
trace! ( " WebSocket error: {:?} " , err ) ;
// Cancel the WebSocket tasks
rpc . read ( ) . await . canceller . cancel ( ) ;
// Exit out of the loop
break ;
2023-08-16 12:27:53 +00:00
}
}
}
}
// Wait for all tasks to finish
while let Some ( res ) = tasks . join_next ( ) . await {
if let Err ( err ) = res {
2023-12-05 09:28:29 +00:00
// There was an error with the task
trace! ( " WebSocket request error: {:?} " , err ) ;
2023-08-16 12:27:53 +00:00
}
}
2023-12-05 09:28:29 +00:00
// Abort all tasks
tasks . shutdown ( ) . await ;
2023-08-16 12:27:53 +00:00
}
/// Send live query notifications to the client
2023-09-12 09:38:28 +00:00
async fn notifications ( rpc : Arc < RwLock < Connection > > ) {
2023-08-16 12:27:53 +00:00
if let Some ( channel ) = DB . get ( ) . unwrap ( ) . notifications ( ) {
2023-11-20 19:08:07 +00:00
let canceller = rpc . read ( ) . await . canceller . clone ( ) ;
2023-08-16 12:27:53 +00:00
loop {
tokio ::select! {
2023-11-20 19:08:07 +00:00
//
biased ;
// Check if this has shutdown
_ = canceller . cancelled ( ) = > break ,
//
2023-08-16 12:27:53 +00:00
msg = channel . recv ( ) = > {
if let Ok ( notification ) = msg {
// Find which WebSocket the notification belongs to
2024-01-09 15:27:03 +00:00
if let Some ( id ) = LIVE_QUERIES . read ( ) . await . get ( & notification . id ) {
2023-08-16 12:27:53 +00:00
// Check to see if the WebSocket exists
2024-01-09 15:27:03 +00:00
if let Some ( WebSocketRef ( ws , _ ) ) = WEBSOCKETS . read ( ) . await . get ( id ) {
2023-08-16 12:27:53 +00:00
// Serialize the message to send
let message = success ( None , notification ) ;
// Get the current output format
2023-11-20 19:08:07 +00:00
let format = rpc . read ( ) . await . format ;
2023-08-16 12:27:53 +00:00
// Send the notification to the client
2023-11-13 17:19:47 +00:00
message . send ( format , ws ) . await
2023-08-16 12:27:53 +00:00
}
}
}
} ,
}
}
}
}
/// Handle individual WebSocket messages
2023-11-20 19:08:07 +00:00
async fn handle_message ( rpc : Arc < RwLock < Connection > > , msg : Message , chn : Sender < Message > ) {
2023-08-16 12:27:53 +00:00
// Get the current output format
2024-01-09 15:27:03 +00:00
let mut fmt = rpc . read ( ) . await . format ;
2023-08-16 12:27:53 +00:00
// Prepare Span and Otel context
2024-01-09 15:27:03 +00:00
let span = span_for_request ( & rpc . read ( ) . await . id ) ;
2023-11-20 19:08:07 +00:00
// Acquire concurrent request rate limiter
let permit = rpc . read ( ) . await . limiter . clone ( ) . acquire_owned ( ) . await . unwrap ( ) ;
2024-01-09 15:27:03 +00:00
// Calculate the length of the message
let len = match msg {
Message ::Text ( ref msg ) = > {
// If no format was specified, default to JSON
if fmt . is_none ( ) {
fmt = Format ::Json ;
rpc . write ( ) . await . format = fmt ;
}
// Retrieve the length of the message
msg . len ( )
}
Message ::Binary ( ref msg ) = > {
// If no format was specified, default to Bincode
if fmt . is_none ( ) {
fmt = Format ::Bincode ;
rpc . write ( ) . await . format = fmt ;
}
// Retrieve the length of the message
msg . len ( )
}
_ = > unreachable! ( ) ,
} ;
2023-08-16 12:27:53 +00:00
// Parse the request
2023-08-29 12:04:53 +00:00
async move {
let span = Span ::current ( ) ;
let req_cx = RequestContext ::default ( ) ;
let otel_cx = TelemetryContext ::new ( ) . with_value ( req_cx . clone ( ) ) ;
2024-01-09 15:27:03 +00:00
// Parse the RPC request structure
match fmt . req ( msg ) {
2023-08-29 12:04:53 +00:00
Ok ( req ) = > {
// Now that we know the method, we can update the span and create otel context
span . record ( " rpc.method " , & req . method ) ;
span . record ( " otel.name " , format! ( " surrealdb.rpc/ {} " , req . method ) ) ;
span . record (
2023-11-20 19:08:07 +00:00
" rpc.request_id " ,
req . id . clone ( ) . map ( Value ::as_string ) . unwrap_or_default ( ) ,
2023-08-29 12:04:53 +00:00
) ;
let otel_cx = TelemetryContext ::current_with_value (
2024-01-09 15:27:03 +00:00
req_cx . with_method ( & req . method ) . with_size ( len ) ,
2023-08-29 12:04:53 +00:00
) ;
2023-11-20 19:08:07 +00:00
// Process the message
2023-08-29 12:04:53 +00:00
let res =
2023-11-20 19:08:07 +00:00
Connection ::process_message ( rpc . clone ( ) , & req . method , req . params ) . await ;
2023-08-29 12:04:53 +00:00
// Process the response
2024-01-09 15:27:03 +00:00
res . into_response ( req . id ) . send ( fmt , & chn ) . with_context ( otel_cx ) . await
2023-08-29 12:04:53 +00:00
}
Err ( err ) = > {
// Process the response
2024-01-09 15:27:03 +00:00
failure ( None , err ) . send ( fmt , & chn ) . with_context ( otel_cx ) . await
2023-08-29 12:04:53 +00:00
}
2023-08-16 12:27:53 +00:00
}
}
2023-08-29 12:04:53 +00:00
. instrument ( span )
. await ;
2023-11-20 19:08:07 +00:00
// Drop the rate limiter permit
drop ( permit ) ;
}
pub async fn process_message (
rpc : Arc < RwLock < Connection > > ,
method : & str ,
params : Array ,
) -> Result < Data , Failure > {
debug! ( " Process RPC request " ) ;
// Match the method to a function
match method {
// Handle a surrealdb ping message
//
// This is used to keep the WebSocket connection alive in environments where the WebSocket protocol is not enough.
// For example, some browsers will wait for the TCP protocol to timeout before triggering an on_close event. This may take several seconds or even minutes in certain scenarios.
// By sending a ping message every few seconds from the client, we can force a connection check and trigger an on_close event if the ping can't be sent.
//
" ping " = > Ok ( Value ::None . into ( ) ) ,
// Retrieve the current auth record
" info " = > match params . len ( ) {
0 = > rpc . read ( ) . await . info ( ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Switch to a specific namespace and database
" use " = > match params . needs_two ( ) {
Ok ( ( ns , db ) ) = > {
rpc . write ( ) . await . yuse ( ns , db ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Signup to a specific authentication scope
" signup " = > match params . needs_one ( ) {
Ok ( Value ::Object ( v ) ) = > {
rpc . write ( ) . await . signup ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Signin as a root, namespace, database or scope user
" signin " = > match params . needs_one ( ) {
Ok ( Value ::Object ( v ) ) = > {
rpc . write ( ) . await . signin ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Invalidate the current authentication session
" invalidate " = > match params . len ( ) {
0 = > rpc . write ( ) . await . invalidate ( ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Authenticate using an authentication token
" authenticate " = > match params . needs_one ( ) {
Ok ( Value ::Strand ( v ) ) = > {
rpc . write ( ) . await . authenticate ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Kill a live query using a query id
" kill " = > match params . needs_one ( ) {
Ok ( v ) = > rpc . read ( ) . await . kill ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Setup a live query on a specific table
" live " = > match params . needs_one_or_two ( ) {
Ok ( ( v , d ) ) if v . is_table ( ) = > {
rpc . read ( ) . await . live ( v , d ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
Ok ( ( v , d ) ) if v . is_strand ( ) = > {
rpc . read ( ) . await . live ( v , d ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Specify a connection-wide parameter
" let " | " set " = > match params . needs_one_or_two ( ) {
Ok ( ( Value ::Strand ( s ) , v ) ) = > {
rpc . write ( ) . await . set ( s , v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Unset and clear a connection-wide parameter
" unset " = > match params . needs_one ( ) {
Ok ( Value ::Strand ( s ) ) = > {
rpc . write ( ) . await . unset ( s ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Select a value or values from the database
" select " = > match params . needs_one ( ) {
Ok ( v ) = > rpc . read ( ) . await . select ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Insert a value or values in the database
" insert " = > match params . needs_one_or_two ( ) {
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . insert ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Create a value or values in the database
" create " = > match params . needs_one_or_two ( ) {
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . create ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Update a value or values in the database using `CONTENT`
" update " = > match params . needs_one_or_two ( ) {
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . update ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Update a value or values in the database using `MERGE`
" merge " = > match params . needs_one_or_two ( ) {
Ok ( ( v , o ) ) = > {
rpc . read ( ) . await . merge ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Update a value or values in the database using `PATCH`
" patch " = > match params . needs_one_two_or_three ( ) {
Ok ( ( v , o , d ) ) = > {
rpc . read ( ) . await . patch ( v , o , d ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Delete a value or values from the database
" delete " = > match params . needs_one ( ) {
Ok ( v ) = > rpc . read ( ) . await . delete ( v ) . await . map ( Into ::into ) . map_err ( Into ::into ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Get the current server version
" version " = > match params . len ( ) {
0 = > Ok ( format! ( " {PKG_NAME} - {} " , * PKG_VERSION ) . into ( ) ) ,
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
// Run a full SurrealQL query against the database
" query " = > match params . needs_one_or_two ( ) {
Ok ( ( v , o ) ) if ( v . is_strand ( ) | | v . is_query ( ) ) & & o . is_none_or_null ( ) = > {
rpc . read ( ) . await . query ( v ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
Ok ( ( v , Value ::Object ( o ) ) ) if v . is_strand ( ) | | v . is_query ( ) = > {
rpc . read ( ) . await . query_with ( v , o ) . await . map ( Into ::into ) . map_err ( Into ::into )
}
_ = > Err ( Failure ::INVALID_PARAMS ) ,
} ,
_ = > Err ( Failure ::METHOD_NOT_FOUND ) ,
}
}
// ------------------------------
// Methods for authentication
// ------------------------------
async fn yuse ( & mut self , ns : Value , db : Value ) -> Result < Value , Error > {
if let Value ::Strand ( ns ) = ns {
self . session . ns = Some ( ns . 0 ) ;
}
if let Value ::Strand ( db ) = db {
self . session . db = Some ( db . 0 ) ;
}
Ok ( Value ::None )
}
async fn signup ( & mut self , vars : Object ) -> Result < Value , Error > {
let kvs = DB . get ( ) . unwrap ( ) ;
surrealdb ::iam ::signup ::signup ( kvs , & mut self . session , vars )
. await
. map ( Into ::into )
. map_err ( Into ::into )
}
async fn signin ( & mut self , vars : Object ) -> Result < Value , Error > {
let kvs = DB . get ( ) . unwrap ( ) ;
surrealdb ::iam ::signin ::signin ( kvs , & mut self . session , vars )
. await
. map ( Into ::into )
. map_err ( Into ::into )
}
async fn invalidate ( & mut self ) -> Result < Value , Error > {
surrealdb ::iam ::clear ::clear ( & mut self . session ) ? ;
Ok ( Value ::None )
}
async fn authenticate ( & mut self , token : Strand ) -> Result < Value , Error > {
let kvs = DB . get ( ) . unwrap ( ) ;
surrealdb ::iam ::verify ::token ( kvs , & mut self . session , & token . 0 ) . await ? ;
Ok ( Value ::None )
}
// ------------------------------
// Methods for identification
// ------------------------------
async fn info ( & self ) -> Result < Value , Error > {
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " SELECT * FROM $auth " ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , None ) . await ? ;
// Extract the first value from the result
let res = res . remove ( 0 ) . result ? . first ( ) ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for setting variables
// ------------------------------
async fn set ( & mut self , key : Strand , val : Value ) -> Result < Value , Error > {
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the query parameters
2024-01-05 11:28:01 +00:00
let var = Some ( map! {
key . 0. clone ( ) = > Value ::None ,
= > & self . vars
} ) ;
2023-11-20 19:08:07 +00:00
// Compute the specified parameter
match kvs . compute ( val , & self . session , var ) . await ? {
// Remove the variable if undefined
Value ::None = > self . vars . remove ( & key . 0 ) ,
// Store the variable if defined
v = > self . vars . insert ( key . 0 , v ) ,
} ;
Ok ( Value ::Null )
}
async fn unset ( & mut self , key : Strand ) -> Result < Value , Error > {
self . vars . remove ( & key . 0 ) ;
Ok ( Value ::Null )
}
// ------------------------------
// Methods for live queries
// ------------------------------
async fn kill ( & self , id : Value ) -> Result < Value , Error > {
// Specify the SQL query string
let sql = " KILL $id " ;
// Specify the query parameters
let var = map! {
2024-01-09 15:27:03 +00:00
String ::from ( " id " ) = > id ,
2023-11-20 19:08:07 +00:00
= > & self . vars
} ;
// Execute the query on the database
let mut res = self . query_with ( Value ::from ( sql ) , Object ::from ( var ) ) . await ? ;
// Extract the first query result
let response = res . remove ( 0 ) ;
match response . result {
Ok ( v ) = > Ok ( v ) ,
Err ( e ) = > Err ( Error ::from ( e ) ) ,
}
}
async fn live ( & self , tb : Value , diff : Value ) -> Result < Value , Error > {
// Specify the SQL query string
let sql = match diff . is_true ( ) {
true = > " LIVE SELECT DIFF FROM $tb " ,
false = > " LIVE SELECT * FROM $tb " ,
} ;
// Specify the query parameters
let var = map! {
String ::from ( " tb " ) = > tb . could_be_table ( ) ,
= > & self . vars
} ;
// Execute the query on the database
let mut res = self . query_with ( Value ::from ( sql ) , Object ::from ( var ) ) . await ? ;
// Extract the first query result
let response = res . remove ( 0 ) ;
match response . result {
Ok ( v ) = > Ok ( v ) ,
Err ( e ) = > Err ( Error ::from ( e ) ) ,
}
}
// ------------------------------
// Methods for selecting
// ------------------------------
async fn select ( & self , what : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " SELECT * FROM $what " ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for inserting
// ------------------------------
async fn insert ( & self , what : Value , data : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " INSERT INTO $what $data RETURN AFTER " ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for creating
// ------------------------------
async fn create ( & self , what : Value , data : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = if data . is_none_or_null ( ) {
" CREATE $what RETURN AFTER "
} else {
" CREATE $what CONTENT $data RETURN AFTER "
} ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for updating
// ------------------------------
async fn update ( & self , what : Value , data : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = if data . is_none_or_null ( ) {
" UPDATE $what RETURN AFTER "
} else {
" UPDATE $what CONTENT $data RETURN AFTER "
} ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for merging
// ------------------------------
async fn merge ( & self , what : Value , data : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = if data . is_none_or_null ( ) {
" UPDATE $what RETURN AFTER "
} else {
" UPDATE $what MERGE $data RETURN AFTER "
} ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for patching
// ------------------------------
async fn patch ( & self , what : Value , data : Value , diff : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = match diff . is_true ( ) {
true = > " UPDATE $what PATCH $data RETURN DIFF " ,
false = > " UPDATE $what PATCH $data RETURN AFTER " ,
} ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
String ::from ( " data " ) = > data ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for deleting
// ------------------------------
async fn delete ( & self , what : Value ) -> Result < Value , Error > {
// Return a single result?
let one = what . is_thing ( ) ;
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the SQL query string
let sql = " DELETE $what RETURN BEFORE " ;
// Specify the query parameters
let var = Some ( map! {
String ::from ( " what " ) = > what . could_be_table ( ) ,
= > & self . vars
} ) ;
// Execute the query on the database
let mut res = kvs . execute ( sql , & self . session , var ) . await ? ;
// Extract the first query result
let res = match one {
true = > res . remove ( 0 ) . result ? . first ( ) ,
false = > res . remove ( 0 ) . result ? ,
} ;
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Methods for querying
// ------------------------------
async fn query ( & self , sql : Value ) -> Result < Vec < Response > , Error > {
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the query parameters
let var = Some ( self . vars . clone ( ) ) ;
// Execute the query on the database
let res = match sql {
Value ::Query ( sql ) = > kvs . process ( sql , & self . session , var ) . await ? ,
Value ::Strand ( sql ) = > kvs . execute ( & sql , & self . session , var ) . await ? ,
_ = > unreachable! ( ) ,
} ;
// Post-process hooks for web layer
for response in & res {
self . handle_live_query_results ( response ) . await ;
}
// Return the result to the client
Ok ( res )
}
async fn query_with ( & self , sql : Value , mut vars : Object ) -> Result < Vec < Response > , Error > {
// Get a database reference
let kvs = DB . get ( ) . unwrap ( ) ;
// Specify the query parameters
let var = Some ( mrg! { vars . 0 , & self . vars } ) ;
// Execute the query on the database
let res = match sql {
Value ::Query ( sql ) = > kvs . process ( sql , & self . session , var ) . await ? ,
Value ::Strand ( sql ) = > kvs . execute ( & sql , & self . session , var ) . await ? ,
_ = > unreachable! ( ) ,
} ;
// Post-process hooks for web layer
for response in & res {
self . handle_live_query_results ( response ) . await ;
}
// Return the result to the client
Ok ( res )
}
// ------------------------------
// Private methods
// ------------------------------
async fn handle_live_query_results ( & self , res : & Response ) {
match & res . query_type {
QueryType ::Live = > {
if let Ok ( Value ::Uuid ( lqid ) ) = & res . result {
// Match on Uuid type
2024-01-09 15:27:03 +00:00
LIVE_QUERIES . write ( ) . await . insert ( lqid . 0 , self . id ) ;
trace! ( " Registered live query {} on websocket {} " , lqid , self . id ) ;
2023-11-20 19:08:07 +00:00
}
}
QueryType ::Kill = > {
if let Ok ( Value ::Uuid ( lqid ) ) = & res . result {
2024-01-09 15:27:03 +00:00
if let Some ( id ) = LIVE_QUERIES . write ( ) . await . remove ( & lqid . 0 ) {
trace! ( " Unregistered live query {} on websocket {} " , lqid , id ) ;
2023-11-20 19:08:07 +00:00
}
}
}
_ = > { }
}
2023-08-16 12:27:53 +00:00
}
}