Implement SQL database exporting and importing
This commit is contained in:
parent
3498e57e04
commit
17e8ea55b5
6 changed files with 252 additions and 28 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -2374,6 +2374,7 @@ dependencies = [
|
||||||
"argon2",
|
"argon2",
|
||||||
"async-recursion",
|
"async-recursion",
|
||||||
"bigdecimal",
|
"bigdecimal",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"dmp",
|
"dmp",
|
||||||
"echodb",
|
"echodb",
|
||||||
|
|
|
@ -17,6 +17,7 @@ kv-yokudb = []
|
||||||
argon2 = "0.4.0"
|
argon2 = "0.4.0"
|
||||||
async-recursion = "1.0.0"
|
async-recursion = "1.0.0"
|
||||||
bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] }
|
bigdecimal = { version = "0.3.0", features = ["serde", "string-only"] }
|
||||||
|
bytes = "1.1.0"
|
||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.19", features = ["serde"] }
|
||||||
derive = { version = "0.1.2", package = "surrealdb-derive" }
|
derive = { version = "0.1.2", package = "surrealdb-derive" }
|
||||||
dmp = "0.1.1"
|
dmp = "0.1.1"
|
||||||
|
|
|
@ -255,6 +255,13 @@ impl From<TiKVError> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "parallel")]
|
||||||
|
impl From<TokioError<bytes::Bytes>> for Error {
|
||||||
|
fn from(e: TokioError<bytes::Bytes>) -> Error {
|
||||||
|
Error::Channel(e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "parallel")]
|
#[cfg(feature = "parallel")]
|
||||||
impl From<TokioError<(Option<Thing>, Value)>> for Error {
|
impl From<TokioError<(Option<Thing>, Value)>> for Error {
|
||||||
fn from(e: TokioError<(Option<Thing>, Value)>) -> Error {
|
fn from(e: TokioError<(Option<Thing>, Value)>) -> Error {
|
||||||
|
|
|
@ -7,8 +7,12 @@ use crate::dbs::Response;
|
||||||
use crate::dbs::Session;
|
use crate::dbs::Session;
|
||||||
use crate::dbs::Variables;
|
use crate::dbs::Variables;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
|
use crate::key::thing;
|
||||||
use crate::sql;
|
use crate::sql;
|
||||||
use crate::sql::query::Query;
|
use crate::sql::query::Query;
|
||||||
|
use crate::sql::thing::Thing;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
/// The underlying datastore instance which stores the dataset.
|
/// The underlying datastore instance which stores the dataset.
|
||||||
pub struct Datastore {
|
pub struct Datastore {
|
||||||
|
@ -105,6 +109,7 @@ impl Datastore {
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new transaction on this datastore
|
/// Create a new transaction on this datastore
|
||||||
pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> {
|
pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> {
|
||||||
match &self.inner {
|
match &self.inner {
|
||||||
|
@ -138,6 +143,7 @@ impl Datastore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse and execute an SQL query
|
/// Parse and execute an SQL query
|
||||||
pub async fn execute(
|
pub async fn execute(
|
||||||
&self,
|
&self,
|
||||||
|
@ -165,6 +171,7 @@ impl Datastore {
|
||||||
opt.db = sess.db();
|
opt.db = sess.db();
|
||||||
exe.execute(ctx, opt, ast).await
|
exe.execute(ctx, opt, ast).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute a pre-parsed SQL query
|
/// Execute a pre-parsed SQL query
|
||||||
pub async fn process(
|
pub async fn process(
|
||||||
&self,
|
&self,
|
||||||
|
@ -190,4 +197,168 @@ impl Datastore {
|
||||||
opt.db = sess.db();
|
opt.db = sess.db();
|
||||||
exe.execute(ctx, opt, ast).await
|
exe.execute(ctx, opt, ast).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Performs a full database export as SQL
|
||||||
|
pub async fn export(&self, ns: String, db: String, chn: Sender<Bytes>) -> Result<(), Error> {
|
||||||
|
// Start a new transaction
|
||||||
|
let mut txn = self.transaction(false, false).await?;
|
||||||
|
// Output OPTIONS
|
||||||
|
{
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- OPTION")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
chn.send(output!("OPTION IMPORT;")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
// Output LOGINS
|
||||||
|
{
|
||||||
|
let dls = txn.all_dl(&ns, &db).await?;
|
||||||
|
if !dls.is_empty() {
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- LOGINS")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
for dl in dls {
|
||||||
|
chn.send(output!(format!("{};", dl))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Output TOKENS
|
||||||
|
{
|
||||||
|
let dts = txn.all_dt(&ns, &db).await?;
|
||||||
|
if !dts.is_empty() {
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- TOKENS")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
for dt in dts {
|
||||||
|
chn.send(output!(format!("{};", dt))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Output SCOPES
|
||||||
|
{
|
||||||
|
let scs = txn.all_sc(&ns, &db).await?;
|
||||||
|
if !scs.is_empty() {
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- SCOPES")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
for sc in scs {
|
||||||
|
chn.send(output!(format!("{};", sc))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Output TABLES
|
||||||
|
{
|
||||||
|
let tbs = txn.all_tb(&ns, &db).await?;
|
||||||
|
if !tbs.is_empty() {
|
||||||
|
for tb in &tbs {
|
||||||
|
// Output TABLE
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!(format!("-- TABLE: {}", tb.name))).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
chn.send(output!(format!("{};", tb))).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
// Output FIELDS
|
||||||
|
{
|
||||||
|
let fds = txn.all_fd(&ns, &db, &tb.name).await?;
|
||||||
|
if !fds.is_empty() {
|
||||||
|
for fd in &fds {
|
||||||
|
chn.send(output!(format!("{};", fd))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Output INDEXS
|
||||||
|
let ixs = txn.all_fd(&ns, &db, &tb.name).await?;
|
||||||
|
if !ixs.is_empty() {
|
||||||
|
for ix in &ixs {
|
||||||
|
chn.send(output!(format!("{};", ix))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
// Output EVENTS
|
||||||
|
let evs = txn.all_ev(&ns, &db, &tb.name).await?;
|
||||||
|
if !evs.is_empty() {
|
||||||
|
for ev in &evs {
|
||||||
|
chn.send(output!(format!("{};", ev))).await?;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Start transaction
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- TRANSACTION")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
chn.send(output!("BEGIN TRANSACTION;")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
// Output TABLE data
|
||||||
|
for tb in &tbs {
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!(format!("-- TABLE DATA: {}", tb.name))).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
// Fetch records
|
||||||
|
let beg = thing::prefix(&ns, &db, &tb.name);
|
||||||
|
let end = thing::suffix(&ns, &db, &tb.name);
|
||||||
|
let mut nxt: Option<Vec<u8>> = None;
|
||||||
|
loop {
|
||||||
|
let res = match nxt {
|
||||||
|
None => {
|
||||||
|
let min = beg.clone();
|
||||||
|
let max = end.clone();
|
||||||
|
txn.scan(min..max, 1000).await?
|
||||||
|
}
|
||||||
|
Some(ref mut beg) => {
|
||||||
|
beg.push(0x00);
|
||||||
|
let min = beg.clone();
|
||||||
|
let max = end.clone();
|
||||||
|
txn.scan(min..max, 1000).await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if !res.is_empty() {
|
||||||
|
// Get total results
|
||||||
|
let n = res.len();
|
||||||
|
// Exit when settled
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Loop over results
|
||||||
|
for (i, (k, v)) in res.into_iter().enumerate() {
|
||||||
|
// Ready the next
|
||||||
|
if n == i + 1 {
|
||||||
|
nxt = Some(k.clone());
|
||||||
|
}
|
||||||
|
// Parse the key-value
|
||||||
|
let k: crate::key::thing::Thing = (&k).into();
|
||||||
|
let v: crate::sql::value::Value = (&v).into();
|
||||||
|
let t = Thing::from((k.tb, k.id));
|
||||||
|
// Write record
|
||||||
|
chn.send(output!(format!("UPDATE {} CONTENT {};", t, v))).await?;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
// Commit transaction
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("-- TRANSACTION")).await?;
|
||||||
|
chn.send(output!("-- ------------------------------")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
chn.send(output!("COMMIT TRANSACTION;")).await?;
|
||||||
|
chn.send(output!("")).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Everything fine
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,25 +1,43 @@
|
||||||
// use crate::net::DB;
|
use crate::err::Error;
|
||||||
// use hyper::body::Body;
|
|
||||||
// use surrealdb::dbs::export;
|
|
||||||
use crate::net::session;
|
use crate::net::session;
|
||||||
|
use crate::net::DB;
|
||||||
|
use hyper::body::Body;
|
||||||
use surrealdb::Session;
|
use surrealdb::Session;
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
|
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
|
||||||
// Set base path
|
warp::path("export")
|
||||||
let base = warp::path("export").and(warp::path::end());
|
.and(warp::path::end())
|
||||||
// Set opts method
|
.and(warp::get())
|
||||||
let opts = base.and(warp::options()).map(warp::reply);
|
.and(session::build())
|
||||||
// Set get method
|
.and_then(handler)
|
||||||
let get = base.and(warp::get()).and(conf::build()).and_then(handler);
|
|
||||||
// Specify route
|
|
||||||
opts.or(get)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(_session: Session) -> Result<impl warp::Reply, warp::Rejection> {
|
async fn handler(session: Session) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
// let db = DB.get().unwrap().clone();
|
// Check the permissions
|
||||||
// let (chn, body) = Body::channel();
|
match session.au.is_db() {
|
||||||
// tokio::spawn(export(db, session, chn));
|
true => {
|
||||||
// Ok(warp::reply::Response::new(body))
|
// Get the datastore reference
|
||||||
Ok(warp::reply::reply())
|
let db = DB.get().unwrap();
|
||||||
|
// Extract the NS header value
|
||||||
|
let nsv = session.ns.clone().unwrap();
|
||||||
|
// Extract the DB header value
|
||||||
|
let dbv = session.db.clone().unwrap();
|
||||||
|
// Create a chunked response
|
||||||
|
let (mut chn, bdy) = Body::channel();
|
||||||
|
// Initiate a new async channel
|
||||||
|
let (snd, mut rcv) = tokio::sync::mpsc::channel(100);
|
||||||
|
// Spawn a new database export
|
||||||
|
tokio::spawn(db.export(nsv, dbv, snd));
|
||||||
|
// Process all processed values
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(v) = rcv.recv().await {
|
||||||
|
let _ = chn.send_data(v).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Return the chunked body
|
||||||
|
Ok(warp::reply::Response::new(bdy))
|
||||||
|
}
|
||||||
|
_ => Err(warp::reject::custom(Error::InvalidAuth)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,22 +1,48 @@
|
||||||
|
use crate::err::Error;
|
||||||
|
use crate::net::output;
|
||||||
|
use crate::net::session;
|
||||||
|
use crate::net::DB;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use surrealdb::Session;
|
||||||
use warp::http;
|
use warp::http;
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
const MAX: u64 = 1024 * 1024 * 1024 * 4; // 4 GiB
|
const MAX: u64 = 1024 * 1024 * 1024 * 4; // 4 GiB
|
||||||
|
|
||||||
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
|
pub fn config() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
|
||||||
// Set base path
|
warp::path("import")
|
||||||
let base = warp::path("import").and(warp::path::end());
|
.and(warp::path::end())
|
||||||
// Set opts method
|
|
||||||
let opts = base.and(warp::options()).map(warp::reply);
|
|
||||||
// Set post method
|
|
||||||
let post = base
|
|
||||||
.and(warp::post())
|
.and(warp::post())
|
||||||
|
.and(session::build())
|
||||||
|
.and(warp::header::<String>(http::header::CONTENT_TYPE.as_str()))
|
||||||
.and(warp::body::content_length_limit(MAX))
|
.and(warp::body::content_length_limit(MAX))
|
||||||
.and_then(handler);
|
.and(warp::body::bytes())
|
||||||
// Specify route
|
.and_then(handler)
|
||||||
opts.or(post)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler() -> Result<impl warp::Reply, warp::Rejection> {
|
async fn handler(
|
||||||
Ok(warp::reply::with_status("Ok", http::StatusCode::OK))
|
session: Session,
|
||||||
|
output: String,
|
||||||
|
sql: Bytes,
|
||||||
|
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
|
// Check the permissions
|
||||||
|
match session.au.is_db() {
|
||||||
|
true => {
|
||||||
|
// Get the datastore reference
|
||||||
|
let db = DB.get().unwrap();
|
||||||
|
// Convert the body to a byte slice
|
||||||
|
let sql = std::str::from_utf8(&sql).unwrap();
|
||||||
|
// Execute the sql query in the database
|
||||||
|
match db.execute(sql, &session, None).await {
|
||||||
|
Ok(res) => match output.as_ref() {
|
||||||
|
"application/json" => Ok(output::json(&res)),
|
||||||
|
"application/cbor" => Ok(output::cbor(&res)),
|
||||||
|
"application/msgpack" => Ok(output::pack(&res)),
|
||||||
|
_ => Err(warp::reject::not_found()),
|
||||||
|
},
|
||||||
|
Err(err) => Err(warp::reject::custom(Error::from(err))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => Err(warp::reject::custom(Error::InvalidAuth)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue