From bf218d93632bd278e5852bb1dfb9deee9237d380 Mon Sep 17 00:00:00 2001 From: Micha de Vries Date: Tue, 4 Jun 2024 17:04:07 +0200 Subject: [PATCH] Batched export (#4131) Co-authored-by: Tobie Morgan Hitchcock --- core/src/cnf/mod.rs | 3 +++ core/src/kvs/tx.rs | 42 ++++++++++++++++++++++++++++++---------- tests/cli_integration.rs | 2 +- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/core/src/cnf/mod.rs b/core/src/cnf/mod.rs index ca99cc0f..589660a0 100644 --- a/core/src/cnf/mod.rs +++ b/core/src/cnf/mod.rs @@ -52,3 +52,6 @@ pub static INSECURE_FORWARD_RECORD_ACCESS_ERRORS: Lazy = /// If the environment variable is not present or cannot be parsed, a default value of 50,000 is used. pub static EXTERNAL_SORTING_BUFFER_LIMIT: Lazy = lazy_env_parse!("SURREAL_EXTERNAL_SORTING_BUFFER_LIMIT", usize, 50_000); + +/// The number of records that should be fetched and grouped together in an INSERT statement when exporting. +pub static EXPORT_BATCH_SIZE: Lazy = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000); diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index be17fa1d..ca692114 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -24,6 +24,7 @@ use sql::statements::DefineUserStatement; use sql::statements::LiveStatement; use crate::cf; +use crate::cnf::EXPORT_BATCH_SIZE; use crate::dbs::node::ClusterMembership; use crate::dbs::node::Timestamp; use crate::err::Error; @@ -2543,6 +2544,11 @@ impl Transaction { chn.send(bytes!("")).await?; chn.send(bytes!("BEGIN TRANSACTION;")).await?; chn.send(bytes!("")).await?; + // Records to be exported, categorised by the type of INSERT statement + let mut exported_normal: Vec = + Vec::with_capacity(*EXPORT_BATCH_SIZE as usize); + let mut exported_relation: Vec = + Vec::with_capacity(*EXPORT_BATCH_SIZE as usize); // Output TABLE data for tb in tbs.iter() { // Start records @@ -2555,32 +2561,48 @@ impl Transaction { let end = crate::key::thing::suffix(ns, db, &tb.name); let mut nxt: Option>> = Some(ScanPage::from(beg..end)); while nxt.is_some() { - let res = self.scan_paged(nxt.unwrap(), 1000).await?; + let res = self.scan_paged(nxt.unwrap(), *EXPORT_BATCH_SIZE).await?; nxt = res.next_page; let res = res.values; if res.is_empty() { break; } - // Loop over results - for (k, v) in res.into_iter() { + + // Categorize results + for (_, v) in res.into_iter() { // Parse the key and the value - let k: crate::key::thing::Thing = (&k).into(); let v: Value = (&v).into(); - let t = Thing::from((k.tb, k.id)); // Check if this is a graph edge match (v.pick(&*EDGE), v.pick(&*IN), v.pick(&*OUT)) { // This is a graph edge record - (Value::Bool(true), Value::Thing(l), Value::Thing(r)) => { - let sql = format!("RELATE {l} -> {t} -> {r} CONTENT {v};",); - chn.send(bytes!(sql)).await?; + (Value::Bool(true), Value::Thing(_), Value::Thing(_)) => { + exported_relation.push(v.to_string()); } // This is a normal record _ => { - let sql = format!("UPDATE {t} CONTENT {v};"); - chn.send(bytes!(sql)).await?; + exported_normal.push(v.to_string()); } } } + + // Add batches of INSERT statements + // No need to chunk here, the scan it limited to 1000 + if !exported_normal.is_empty() { + let values = exported_normal.join(", "); + let sql = format!("INSERT [ {values} ];"); + chn.send(bytes!(sql)).await?; + exported_normal.clear(); + } + + // Add batches of INSERT RELATION statements + // No need to chunk here, the scan it limited to 1000 + if !exported_relation.is_empty() { + let values = exported_relation.join(", "); + let sql = format!("INSERT RELATION [ {values} ];"); + chn.send(bytes!(sql)).await?; + exported_relation.clear() + } + continue; } chn.send(bytes!("")).await?; diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index e91137a5..142f88fd 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -101,7 +101,7 @@ mod cli_integration { let args = format!("export --conn http://{addr} {creds} --ns {ns} --db {db} -"); let output = common::run(&args).output().expect("failed to run stdout export: {args}"); assert!(output.contains("DEFINE TABLE thing TYPE ANY SCHEMALESS PERMISSIONS NONE;")); - assert!(output.contains("UPDATE thing:one CONTENT { id: thing:one };")); + assert!(output.contains("INSERT [ { id: thing:one } ];")); } info!("* Export to file");