Batched export (#4131)

Co-authored-by: Tobie Morgan Hitchcock <tobie@surrealdb.com>
This commit is contained in:
Micha de Vries 2024-06-04 17:04:07 +02:00 committed by GitHub
parent e9987f66de
commit bf218d9363
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 36 additions and 11 deletions

View file

@ -52,3 +52,6 @@ pub static INSECURE_FORWARD_RECORD_ACCESS_ERRORS: Lazy<bool> =
/// If the environment variable is not present or cannot be parsed, a default value of 50,000 is used. /// If the environment variable is not present or cannot be parsed, a default value of 50,000 is used.
pub static EXTERNAL_SORTING_BUFFER_LIMIT: Lazy<usize> = pub static EXTERNAL_SORTING_BUFFER_LIMIT: Lazy<usize> =
lazy_env_parse!("SURREAL_EXTERNAL_SORTING_BUFFER_LIMIT", usize, 50_000); lazy_env_parse!("SURREAL_EXTERNAL_SORTING_BUFFER_LIMIT", usize, 50_000);
/// The number of records that should be fetched and grouped together in an INSERT statement when exporting.
pub static EXPORT_BATCH_SIZE: Lazy<u32> = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000);

View file

@ -24,6 +24,7 @@ use sql::statements::DefineUserStatement;
use sql::statements::LiveStatement; use sql::statements::LiveStatement;
use crate::cf; use crate::cf;
use crate::cnf::EXPORT_BATCH_SIZE;
use crate::dbs::node::ClusterMembership; use crate::dbs::node::ClusterMembership;
use crate::dbs::node::Timestamp; use crate::dbs::node::Timestamp;
use crate::err::Error; use crate::err::Error;
@ -2543,6 +2544,11 @@ impl Transaction {
chn.send(bytes!("")).await?; chn.send(bytes!("")).await?;
chn.send(bytes!("BEGIN TRANSACTION;")).await?; chn.send(bytes!("BEGIN TRANSACTION;")).await?;
chn.send(bytes!("")).await?; chn.send(bytes!("")).await?;
// Records to be exported, categorised by the type of INSERT statement
let mut exported_normal: Vec<String> =
Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
let mut exported_relation: Vec<String> =
Vec::with_capacity(*EXPORT_BATCH_SIZE as usize);
// Output TABLE data // Output TABLE data
for tb in tbs.iter() { for tb in tbs.iter() {
// Start records // Start records
@ -2555,32 +2561,48 @@ impl Transaction {
let end = crate::key::thing::suffix(ns, db, &tb.name); let end = crate::key::thing::suffix(ns, db, &tb.name);
let mut nxt: Option<ScanPage<Vec<u8>>> = Some(ScanPage::from(beg..end)); let mut nxt: Option<ScanPage<Vec<u8>>> = Some(ScanPage::from(beg..end));
while nxt.is_some() { while nxt.is_some() {
let res = self.scan_paged(nxt.unwrap(), 1000).await?; let res = self.scan_paged(nxt.unwrap(), *EXPORT_BATCH_SIZE).await?;
nxt = res.next_page; nxt = res.next_page;
let res = res.values; let res = res.values;
if res.is_empty() { if res.is_empty() {
break; break;
} }
// Loop over results
for (k, v) in res.into_iter() { // Categorize results
for (_, v) in res.into_iter() {
// Parse the key and the value // Parse the key and the value
let k: crate::key::thing::Thing = (&k).into();
let v: Value = (&v).into(); let v: Value = (&v).into();
let t = Thing::from((k.tb, k.id));
// Check if this is a graph edge // Check if this is a graph edge
match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) { match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) {
// This is a graph edge record // This is a graph edge record
(Value::Bool(true), Value::Thing(l), Value::Thing(r)) => { (Value::Bool(true), Value::Thing(_), Value::Thing(_)) => {
let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",); exported_relation.push(v.to_string());
chn.send(bytes!(sql)).await?;
} }
// This is a normal record // This is a normal record
_ => { _ => {
let sql = format!("UPDATE {t} CONTENT {v};"); exported_normal.push(v.to_string());
}
}
}
// Add batches of INSERT statements
// No need to chunk here, the scan it limited to 1000
if !exported_normal.is_empty() {
let values = exported_normal.join(", ");
let sql = format!("INSERT [ {values} ];");
chn.send(bytes!(sql)).await?; chn.send(bytes!(sql)).await?;
exported_normal.clear();
} }
// Add batches of INSERT RELATION statements
// No need to chunk here, the scan it limited to 1000
if !exported_relation.is_empty() {
let values = exported_relation.join(", ");
let sql = format!("INSERT RELATION [ {values} ];");
chn.send(bytes!(sql)).await?;
exported_relation.clear()
} }
}
continue; continue;
} }
chn.send(bytes!("")).await?; chn.send(bytes!("")).await?;

View file

@ -101,7 +101,7 @@ mod cli_integration {
let args = format!("export --conn http://{addr} {creds} --ns {ns} --db {db} -"); let args = format!("export --conn http://{addr} {creds} --ns {ns} --db {db} -");
let output = common::run(&args).output().expect("failed to run stdout export: {args}"); let output = common::run(&args).output().expect("failed to run stdout export: {args}");
assert!(output.contains("DEFINE TABLE thing TYPE ANY SCHEMALESS PERMISSIONS NONE;")); assert!(output.contains("DEFINE TABLE thing TYPE ANY SCHEMALESS PERMISSIONS NONE;"));
assert!(output.contains("UPDATE thing:one CONTENT { id: thing:one };")); assert!(output.contains("INSERT [ { id: thing:one } ];"));
} }
info!("* Export to file"); info!("* Export to file");