Implement WebSocket queries for /sql endpoint
This commit is contained in:
parent
d29961ee19
commit
e4619be89a
1 changed files with 38 additions and 12 deletions
|
@ -4,8 +4,9 @@ use crate::net::output;
|
||||||
use crate::net::session;
|
use crate::net::session;
|
||||||
use crate::net::DB;
|
use crate::net::DB;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use surrealdb::Session;
|
use surrealdb::Session;
|
||||||
|
use warp::ws::{Message, WebSocket, Ws};
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
const MAX: u64 = 1024 * 1024; // 1 MiB
|
const MAX: u64 = 1024 * 1024; // 1 MiB
|
||||||
|
@ -24,17 +25,10 @@ pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejecti
|
||||||
.and(warp::body::bytes())
|
.and(warp::body::bytes())
|
||||||
.and_then(handler);
|
.and_then(handler);
|
||||||
// Set sock method
|
// Set sock method
|
||||||
let sock = base.and(warp::ws()).map(|ws: warp::ws::Ws| {
|
let sock = base
|
||||||
ws.on_upgrade(|websocket| {
|
.and(warp::ws())
|
||||||
// Just echo all messages back...
|
.and(session::build())
|
||||||
let (tx, rx) = websocket.split();
|
.map(|ws: Ws, session: Session| ws.on_upgrade(move |ws| socket(ws, session)));
|
||||||
rx.forward(tx).map(|result| {
|
|
||||||
if let Err(e) = result {
|
|
||||||
eprintln!("websocket error: {:?}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
});
|
|
||||||
// Specify route
|
// Specify route
|
||||||
opts.or(post).or(sock).with(head::cors())
|
opts.or(post).or(sock).with(head::cors())
|
||||||
}
|
}
|
||||||
|
@ -44,15 +38,47 @@ async fn handler(
|
||||||
output: String,
|
output: String,
|
||||||
sql: Bytes,
|
sql: Bytes,
|
||||||
) -> Result<impl warp::Reply, warp::Rejection> {
|
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
|
// Get a database reference
|
||||||
let db = DB.get().unwrap();
|
let db = DB.get().unwrap();
|
||||||
|
// Convert the received sql query
|
||||||
let sql = std::str::from_utf8(&sql).unwrap();
|
let sql = std::str::from_utf8(&sql).unwrap();
|
||||||
|
// Execute the received sql query
|
||||||
match db.execute(sql, &session, None).await {
|
match db.execute(sql, &session, None).await {
|
||||||
|
// Convert the response to JSON
|
||||||
Ok(res) => match output.as_ref() {
|
Ok(res) => match output.as_ref() {
|
||||||
"application/json" => Ok(output::json(&res)),
|
"application/json" => Ok(output::json(&res)),
|
||||||
"application/cbor" => Ok(output::cbor(&res)),
|
"application/cbor" => Ok(output::cbor(&res)),
|
||||||
"application/msgpack" => Ok(output::pack(&res)),
|
"application/msgpack" => Ok(output::pack(&res)),
|
||||||
|
// An incorrect content-type was requested
|
||||||
_ => Err(warp::reject::not_found()),
|
_ => Err(warp::reject::not_found()),
|
||||||
},
|
},
|
||||||
|
// There was an error when executing the query
|
||||||
Err(err) => Err(warp::reject::custom(Error::from(err))),
|
Err(err) => Err(warp::reject::custom(Error::from(err))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn socket(ws: WebSocket, session: Session) {
|
||||||
|
// Split the WebSocket connection
|
||||||
|
let (mut tx, mut rx) = ws.split();
|
||||||
|
// Wait to receive the next message
|
||||||
|
while let Some(res) = rx.next().await {
|
||||||
|
if let Ok(msg) = res {
|
||||||
|
if let Ok(sql) = msg.to_str() {
|
||||||
|
// Get a database reference
|
||||||
|
let db = DB.get().unwrap();
|
||||||
|
// Execute the received sql query
|
||||||
|
let _ = match db.execute(sql, &session, None).await {
|
||||||
|
// Convert the response to JSON
|
||||||
|
Ok(v) => match serde_json::to_string(&v) {
|
||||||
|
// Send the JSON response to the client
|
||||||
|
Ok(v) => tx.send(Message::text(v)).await,
|
||||||
|
// There was an error converting to JSON
|
||||||
|
Err(e) => tx.send(Message::text(Error::from(e))).await,
|
||||||
|
},
|
||||||
|
// There was an error when executing the query
|
||||||
|
Err(e) => tx.send(Message::text(Error::from(e))).await,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue