From 17e8ea55b588d579e19b2ffdc32fb02006f35da5 Mon Sep 17 00:00:00 2001 From: Tobie Morgan Hitchcock Date: Tue, 10 May 2022 08:36:48 +0100 Subject: [PATCH] Implement SQL database exporting and importing --- Cargo.lock | 1 + lib/Cargo.toml | 1 + lib/src/err/mod.rs | 7 ++ lib/src/kvs/ds.rs | 171 +++++++++++++++++++++++++++++++++++++++++++++ src/net/export.rs | 52 +++++++++----- src/net/import.rs | 48 ++++++++++--- 6 files changed, 252 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0586d78..6afe7e5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2374,6 +2374,7 @@ dependencies = [ "argon2", "async-recursion", "bigdecimal", + "bytes", "chrono", "dmp", "echodb", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 7589b091..5cc06d7a 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -17,6 +17,7 @@ kv-yokudb = [] argon2 = "0.4.0" async-recursion = "1.0.0" bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] } +bytes = "1.1.0" chrono = { version = "0.4.19", features = ["serde"] } derive = { version = "0.1.2", package = "surrealdb-derive" } dmp = "0.1.1" diff --git a/lib/src/err/mod.rs b/lib/src/err/mod.rs index 81ff4e30..9d8d3545 100644 --- a/lib/src/err/mod.rs +++ b/lib/src/err/mod.rs @@ -255,6 +255,13 @@ impl From for Error { } } +#[cfg(feature = "parallel")] +impl From> for Error { + fn from(e: TokioError) -> Error { + Error::Channel(e.to_string()) + } +} + #[cfg(feature = "parallel")] impl From, Value)>> for Error { fn from(e: TokioError<(Option, Value)>) -> Error { diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 4536c550..ed72b6a9 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -7,8 +7,12 @@ use crate::dbs::Response; use crate::dbs::Session; use crate::dbs::Variables; use crate::err::Error; +use crate::key::thing; use crate::sql; use crate::sql::query::Query; +use crate::sql::thing::Thing; +use bytes::Bytes; +use tokio::sync::mpsc::Sender; /// The underlying datastore instance which stores the dataset. pub struct Datastore { @@ -105,6 +109,7 @@ impl Datastore { _ => unreachable!(), } } + /// Create a new transaction on this datastore pub async fn transaction(&self, write: bool, lock: bool) -> Result { match &self.inner { @@ -138,6 +143,7 @@ impl Datastore { } } } + /// Parse and execute an SQL query pub async fn execute( &self, @@ -165,6 +171,7 @@ impl Datastore { opt.db = sess.db(); exe.execute(ctx, opt, ast).await } + /// Execute a pre-parsed SQL query pub async fn process( &self, @@ -190,4 +197,168 @@ impl Datastore { opt.db = sess.db(); exe.execute(ctx, opt, ast).await } + + /// Performs a full database export as SQL + pub async fn export(&self, ns: String, db: String, chn: Sender) -> Result<(), Error> { + // Start a new transaction + let mut txn = self.transaction(false, false).await?; + // Output OPTIONS + { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- OPTION")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!("OPTION IMPORT;")).await?; + chn.send(output!("")).await?; + } + // Output LOGINS + { + let dls = txn.all_dl(&ns, &db).await?; + if !dls.is_empty() { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- LOGINS")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + for dl in dls { + chn.send(output!(format!("{};", dl))).await?; + } + chn.send(output!("")).await?; + } + } + // Output TOKENS + { + let dts = txn.all_dt(&ns, &db).await?; + if !dts.is_empty() { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- TOKENS")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + for dt in dts { + chn.send(output!(format!("{};", dt))).await?; + } + chn.send(output!("")).await?; + } + } + // Output SCOPES + { + let scs = txn.all_sc(&ns, &db).await?; + if !scs.is_empty() { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- SCOPES")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + for sc in scs { + chn.send(output!(format!("{};", sc))).await?; + } + chn.send(output!("")).await?; + } + } + // Output TABLES + { + let tbs = txn.all_tb(&ns, &db).await?; + if !tbs.is_empty() { + for tb in &tbs { + // Output TABLE + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!(format!("-- TABLE: {}", tb.name))).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!(format!("{};", tb))).await?; + chn.send(output!("")).await?; + // Output FIELDS + { + let fds = txn.all_fd(&ns, &db, &tb.name).await?; + if !fds.is_empty() { + for fd in &fds { + chn.send(output!(format!("{};", fd))).await?; + } + chn.send(output!("")).await?; + } + } + // Output INDEXS + let ixs = txn.all_fd(&ns, &db, &tb.name).await?; + if !ixs.is_empty() { + for ix in &ixs { + chn.send(output!(format!("{};", ix))).await?; + } + chn.send(output!("")).await?; + } + // Output EVENTS + let evs = txn.all_ev(&ns, &db, &tb.name).await?; + if !evs.is_empty() { + for ev in &evs { + chn.send(output!(format!("{};", ev))).await?; + } + chn.send(output!("")).await?; + } + } + // Start transaction + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- TRANSACTION")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!("BEGIN TRANSACTION;")).await?; + chn.send(output!("")).await?; + // Output TABLE data + for tb in &tbs { + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + // Fetch records + let beg = thing::prefix(&ns, &db, &tb.name); + let end = thing::suffix(&ns, &db, &tb.name); + let mut nxt: Option> = None; + loop { + let res = match nxt { + None => { + let min = beg.clone(); + let max = end.clone(); + txn.scan(min..max, 1000).await? + } + Some(ref mut beg) => { + beg.push(0x00); + let min = beg.clone(); + let max = end.clone(); + txn.scan(min..max, 1000).await? + } + }; + if !res.is_empty() { + // Get total results + let n = res.len(); + // Exit when settled + if n == 0 { + break; + } + // Loop over results + for (i, (k, v)) in res.into_iter().enumerate() { + // Ready the next + if n == i + 1 { + nxt = Some(k.clone()); + } + // Parse the key-value + let k: crate::key::thing::Thing = (&k).into(); + let v: crate::sql::value::Value = (&v).into(); + let t = Thing::from((k.tb, k.id)); + // Write record + chn.send(output!(format!("UPDATE {} CONTENT {};", t, v))).await?; + } + continue; + } + break; + } + chn.send(output!("")).await?; + } + // Commit transaction + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("-- TRANSACTION")).await?; + chn.send(output!("-- ------------------------------")).await?; + chn.send(output!("")).await?; + chn.send(output!("COMMIT TRANSACTION;")).await?; + chn.send(output!("")).await?; + } + } + // Everything fine + Ok(()) + } } diff --git a/src/net/export.rs b/src/net/export.rs index 22ca7704..200ec531 100644 --- a/src/net/export.rs +++ b/src/net/export.rs @@ -1,25 +1,43 @@ -// use crate::net::DB; -// use hyper::body::Body; -// use surrealdb::dbs::export; +use crate::err::Error; use crate::net::session; +use crate::net::DB; +use hyper::body::Body; use surrealdb::Session; use warp::Filter; pub fn config() -> impl Filter + Clone { - // Set base path - let base = warp::path("export").and(warp::path::end()); - // Set opts method - let opts = base.and(warp::options()).map(warp::reply); - // Set get method - let get = base.and(warp::get()).and(conf::build()).and_then(handler); - // Specify route - opts.or(get) + warp::path("export") + .and(warp::path::end()) + .and(warp::get()) + .and(session::build()) + .and_then(handler) } -async fn handler(_session: Session) -> Result { - // let db = DB.get().unwrap().clone(); - // let (chn, body) = Body::channel(); - // tokio::spawn(export(db, session, chn)); - // Ok(warp::reply::Response::new(body)) - Ok(warp::reply::reply()) +async fn handler(session: Session) -> Result { + // Check the permissions + match session.au.is_db() { + true => { + // Get the datastore reference + let db = DB.get().unwrap(); + // Extract the NS header value + let nsv = session.ns.clone().unwrap(); + // Extract the DB header value + let dbv = session.db.clone().unwrap(); + // Create a chunked response + let (mut chn, bdy) = Body::channel(); + // Initiate a new async channel + let (snd, mut rcv) = tokio::sync::mpsc::channel(100); + // Spawn a new database export + tokio::spawn(db.export(nsv, dbv, snd)); + // Process all processed values + tokio::spawn(async move { + while let Some(v) = rcv.recv().await { + let _ = chn.send_data(v).await; + } + }); + // Return the chunked body + Ok(warp::reply::Response::new(bdy)) + } + _ => Err(warp::reject::custom(Error::InvalidAuth)), + } } diff --git a/src/net/import.rs b/src/net/import.rs index 34c7e5fa..51627444 100644 --- a/src/net/import.rs +++ b/src/net/import.rs @@ -1,22 +1,48 @@ +use crate::err::Error; +use crate::net::output; +use crate::net::session; +use crate::net::DB; +use bytes::Bytes; +use surrealdb::Session; use warp::http; use warp::Filter; const MAX: u64 = 1024 * 1024 * 1024 * 4; // 4 GiB pub fn config() -> impl Filter + Clone { - // Set base path - let base = warp::path("import").and(warp::path::end()); - // Set opts method - let opts = base.and(warp::options()).map(warp::reply); - // Set post method - let post = base + warp::path("import") + .and(warp::path::end()) .and(warp::post()) + .and(session::build()) + .and(warp::header::(http::header::CONTENT_TYPE.as_str())) .and(warp::body::content_length_limit(MAX)) - .and_then(handler); - // Specify route - opts.or(post) + .and(warp::body::bytes()) + .and_then(handler) } -async fn handler() -> Result { - Ok(warp::reply::with_status("Ok", http::StatusCode::OK)) +async fn handler( + session: Session, + output: String, + sql: Bytes, +) -> Result { + // Check the permissions + match session.au.is_db() { + true => { + // Get the datastore reference + let db = DB.get().unwrap(); + // Convert the body to a byte slice + let sql = std::str::from_utf8(&sql).unwrap(); + // Execute the sql query in the database + match db.execute(sql, &session, None).await { + Ok(res) => match output.as_ref() { + "application/json" => Ok(output::json(&res)), + "application/cbor" => Ok(output::cbor(&res)), + "application/msgpack" => Ok(output::pack(&res)), + _ => Err(warp::reject::not_found()), + }, + Err(err) => Err(warp::reject::custom(Error::from(err))), + } + } + _ => Err(warp::reject::custom(Error::InvalidAuth)), + } }