diff --git a/Cargo.lock b/Cargo.lock index 99ff4725..845549e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6045,9 +6045,9 @@ dependencies = [ [[package]] name = "surrealkv" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cbebbb653d3a28eb67d371a1f2d09954e09834ef51cd6bab79d7958c95ca81" +checksum = "079b1114b1bbe22e2738f0c6fccc0109d06be9be7f48d2e38d143baf935358e9" dependencies = [ "ahash 0.8.11", "async-channel 2.2.0", diff --git a/core/Cargo.toml b/core/Cargo.toml index 25447010..be3c4c61 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -121,7 +121,14 @@ reqwest = { version = "0.12.5", default-features = false, features = [ "stream", "multipart", ], optional = true } -revision = { version = "0.8.0", features = ["chrono", "geo", "roaring", "regex", "rust_decimal", "uuid"] } +revision = { version = "0.8.0", features = [ + "chrono", + "geo", + "roaring", + "regex", + "rust_decimal", + "uuid", +] } rmpv = "1.0.1" roaring = { version = "0.10.2", features = ["serde"] } rocksdb = { version = "0.21.0", features = ["lz4", "snappy"], optional = true } @@ -135,7 +142,7 @@ sha1 = "0.10.6" sha2 = "0.10.8" snap = "1.1.0" storekey = "0.5.0" -surrealkv = { version = "0.3.1", optional = true } +surrealkv = { version = "0.3.2", optional = true } surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" } tempfile = { version = "3.10.1", optional = true } thiserror = "1.0.50" diff --git a/core/src/cf/reader.rs b/core/src/cf/reader.rs index 25e6fb64..ea212d34 100644 --- a/core/src/cf/reader.rs +++ b/core/src/cf/reader.rs @@ -49,7 +49,7 @@ pub async fn read( // Create an empty buffer for the final changesets let mut res = Vec::::new(); // iterate over _x and put decoded elements to r - for (k, v) in tx.scan(beg..end, limit).await? { + for (k, v) in tx.scan(beg..end, limit, None).await? { #[cfg(debug_assertions)] trace!("Reading change feed entry: {}", k.sprint()); // Decode the changefeed entry key diff --git a/core/src/dbs/options.rs b/core/src/dbs/options.rs index 6014241b..d6bcd25f 100644 --- a/core/src/dbs/options.rs +++ b/core/src/dbs/options.rs @@ -46,6 +46,8 @@ pub struct Options { pub projections: bool, /// The channel over which we send notifications pub sender: Option>, + /// Version as nanosecond timestamp passed down to Datastore + pub version: Option, } #[derive(Clone, Debug)] @@ -81,6 +83,7 @@ impl Options { auth_enabled: true, sender: None, auth: Arc::new(Auth::default()), + version: None, } } @@ -202,6 +205,12 @@ impl Options { self } + // Set the version + pub fn with_version(mut self, version: Option) -> Self { + self.version = version; + self + } + // -------------------------------------------------- /// Create a new Options object for a subquery diff --git a/core/src/dbs/processor.rs b/core/src/dbs/processor.rs index 86603919..591107b3 100644 --- a/core/src/dbs/processor.rs +++ b/core/src/dbs/processor.rs @@ -306,7 +306,7 @@ impl<'a> Processor<'a> { let beg = thing::prefix(opt.ns()?, opt.db()?, v); let end = thing::suffix(opt.ns()?, opt.db()?, v); // Create a new iterable range - let mut stream = txn.stream(beg..end); + let mut stream = txn.stream(beg..end, opt.version); // Loop until no more entries while let Some(res) = stream.next().await { // Check if the context is finished @@ -365,7 +365,7 @@ impl<'a> Processor<'a> { } }; // Create a new iterable range - let mut stream = txn.stream(beg..end); + let mut stream = txn.stream(beg..end, None); // Loop until no more entries while let Some(res) = stream.next().await { // Check if the context is finished @@ -474,7 +474,7 @@ impl<'a> Processor<'a> { // Loop over the chosen edge types for (beg, end) in keys.into_iter() { // Create a new iterable range - let mut stream = txn.stream(beg..end); + let mut stream = txn.stream(beg..end, None); // Loop until no more entries while let Some(res) = stream.next().await { // Check if the context is finished diff --git a/core/src/err/mod.rs b/core/src/err/mod.rs index 7c3bae64..c9eb1901 100644 --- a/core/src/err/mod.rs +++ b/core/src/err/mod.rs @@ -1019,6 +1019,10 @@ pub enum Error { UnsupportedDestructure { variant: String, }, + + #[doc(hidden)] + #[error("The underlying datastore does not support versioned queries")] + UnsupportedVersionedQueries, } impl From for String { diff --git a/core/src/idx/planner/iterators.rs b/core/src/idx/planner/iterators.rs index e6869c52..2b9d6adb 100644 --- a/core/src/idx/planner/iterators.rs +++ b/core/src/idx/planner/iterators.rs @@ -172,7 +172,7 @@ impl IndexEqualThingIterator { ) -> Result { let min = beg.clone(); let max = end.to_owned(); - let res = tx.scan(min..max, limit).await?; + let res = tx.scan(min..max, limit, None).await?; if let Some((key, _)) = res.last() { let mut key = key.clone(); key.push(0x00); @@ -302,7 +302,7 @@ impl IndexRangeThingIterator { ) -> Result { let min = self.r.beg.clone(); let max = self.r.end.clone(); - let res = tx.scan(min..max, limit).await?; + let res = tx.scan(min..max, limit, None).await?; if let Some((key, _)) = res.last() { self.r.beg.clone_from(key); self.r.beg.push(0x00); @@ -600,7 +600,7 @@ impl UniqueRangeThingIterator { let min = self.r.beg.clone(); let max = self.r.end.clone(); limit += 1; - let res = tx.scan(min..max, limit).await?; + let res = tx.scan(min..max, limit, None).await?; let mut records = B::with_capacity(res.len()); for (k, v) in res { limit -= 1; diff --git a/core/src/idx/trees/btree.rs b/core/src/idx/trees/btree.rs index 810dbc72..0250b872 100644 --- a/core/src/idx/trees/btree.rs +++ b/core/src/idx/trees/btree.rs @@ -1432,7 +1432,7 @@ mod tests { assert_eq!(s.max_depth, 3); assert_eq!(s.nodes_count, 10); // There should be one record per node - assert_eq!(10, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); + assert_eq!(10, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len()); let nodes_count = t .inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count { @@ -1571,7 +1571,7 @@ mod tests { assert_eq!(s.max_depth, 2); assert_eq!(s.nodes_count, 7); // There should be one record per node - assert_eq!(7, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); + assert_eq!(7, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len()); let nodes_count = t .inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count { @@ -1695,7 +1695,7 @@ mod tests { assert_eq!(s.max_depth, 0); assert_eq!(s.nodes_count, 0); // There should not be any record in the database - assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len()); + assert_eq!(0, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len()); tx.cancel().await?; Ok(()) } diff --git a/core/src/kvs/api.rs b/core/src/kvs/api.rs index 91d887e3..a75b1b78 100644 --- a/core/src/kvs/api.rs +++ b/core/src/kvs/api.rs @@ -97,7 +97,12 @@ pub trait Transaction { /// Retrieve a specific range of keys from the datastore. /// /// This function fetches the full range of key-value pairs, in a single request to the underlying datastore. - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug; @@ -239,7 +244,7 @@ pub trait Transaction { let end: Key = rng.end.into(); // Scan for the next batch let res = if values { - self.scan(beg..end.clone(), batch).await? + self.scan(beg..end.clone(), batch, None).await? } else { self.keys(beg..end.clone(), batch) .await? diff --git a/core/src/kvs/fdb/mod.rs b/core/src/kvs/fdb/mod.rs index fed8b4b3..edad3368 100644 --- a/core/src/kvs/fdb/mod.rs +++ b/core/src/kvs/fdb/mod.rs @@ -443,10 +443,20 @@ impl super::api::Transaction for Transaction { /// Retrieve a range of keys from the databases #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { + // FDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/indxdb/mod.rs b/core/src/kvs/indxdb/mod.rs index 93631ccf..6b72b457 100644 --- a/core/src/kvs/indxdb/mod.rs +++ b/core/src/kvs/indxdb/mod.rs @@ -294,10 +294,20 @@ impl super::api::Transaction for Transaction { /// Retrieve a range of keys from the databases #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { + // IndexDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/mem/mod.rs b/core/src/kvs/mem/mod.rs index bb24bbcb..81ae9673 100644 --- a/core/src/kvs/mem/mod.rs +++ b/core/src/kvs/mem/mod.rs @@ -291,10 +291,20 @@ impl super::api::Transaction for Transaction { /// Retrieve a range of keys from the databases #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { + // MemDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/rocksdb/mod.rs b/core/src/kvs/rocksdb/mod.rs index d7bc9a8b..1611c4c2 100644 --- a/core/src/kvs/rocksdb/mod.rs +++ b/core/src/kvs/rocksdb/mod.rs @@ -430,10 +430,20 @@ impl super::api::Transaction for Transaction { /// Retrieve a range of keys from the databases #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { + // RocksDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/scanner.rs b/core/src/kvs/scanner.rs index 6d6b4f50..3d967b57 100644 --- a/core/src/kvs/scanner.rs +++ b/core/src/kvs/scanner.rs @@ -26,10 +26,17 @@ pub(super) struct Scanner<'a> { future: Option + 'a>>>, /// Whether this stream should try to fetch more exhausted: bool, + /// Version as timestamp, 0 means latest. + version: Option, } impl<'a> Scanner<'a> { - pub fn new(store: &'a Transaction, batch: u32, range: Range) -> Self { + pub fn new( + store: &'a Transaction, + batch: u32, + range: Range, + version: Option, + ) -> Self { Scanner { store, batch, @@ -37,6 +44,7 @@ impl<'a> Scanner<'a> { future: None, results: VecDeque::new(), exhausted: false, + version, } } } @@ -62,7 +70,7 @@ impl<'a> Stream for Scanner<'a> { // Clone the range to use when scanning let range = self.range.clone(); // Prepare a future to scan for results - self.future = Some(Box::pin(self.store.scan(range, num))); + self.future = Some(Box::pin(self.store.scan(range, num, self.version))); } // Try to resolve the future match self.future.as_mut().unwrap().poll_unpin(cx) { diff --git a/core/src/kvs/surrealkv/mod.rs b/core/src/kvs/surrealkv/mod.rs index a257edd3..7fd92b4e 100644 --- a/core/src/kvs/surrealkv/mod.rs +++ b/core/src/kvs/surrealkv/mod.rs @@ -321,7 +321,12 @@ impl super::api::Transaction for Transaction { /// Retrieves a range of key-value pairs from the database. #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { @@ -332,11 +337,19 @@ impl super::api::Transaction for Transaction { // Set the key range let beg = rng.start.into(); let end = rng.end.into(); + let range = beg.as_slice()..end.as_slice(); + // Retrieve the scan range - let res = self.inner.scan(beg.as_slice()..end.as_slice(), Some(limit as usize))?; - // Convert the keys and values - let res = res.into_iter().map(|kv| (Key::from(kv.0), kv.1)).collect(); - // Return result + let res = match version { + Some(ts) => self.inner.scan_at_ts(range, ts, Some(limit as usize))?, + None => self + .inner + .scan(range, Some(limit as usize))? + .into_iter() + .map(|kv| (kv.0, kv.1)) + .collect(), + }; + Ok(res) } } diff --git a/core/src/kvs/tests/raw.rs b/core/src/kvs/tests/raw.rs index ffc2d096..7de721d9 100644 --- a/core/src/kvs/tests/raw.rs +++ b/core/src/kvs/tests/raw.rs @@ -247,7 +247,7 @@ async fn scan() { tx.commit().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let val = tx.scan("test1".."test9", u32::MAX).await.unwrap(); + let val = tx.scan("test1".."test9", u32::MAX, None).await.unwrap(); assert_eq!(val.len(), 5); assert_eq!(val[0].0, b"test1"); assert_eq!(val[0].1, b"1"); @@ -262,7 +262,7 @@ async fn scan() { tx.cancel().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let val = tx.scan("test2".."test4", u32::MAX).await.unwrap(); + let val = tx.scan("test2".."test4", u32::MAX, None).await.unwrap(); assert_eq!(val.len(), 2); assert_eq!(val[0].0, b"test2"); assert_eq!(val[0].1, b"2"); @@ -271,7 +271,7 @@ async fn scan() { tx.cancel().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let val = tx.scan("test1".."test9", 2).await.unwrap(); + let val = tx.scan("test1".."test9", 2, None).await.unwrap(); assert_eq!(val.len(), 2); assert_eq!(val[0].0, b"test1"); assert_eq!(val[0].1, b"1"); diff --git a/core/src/kvs/tikv/mod.rs b/core/src/kvs/tikv/mod.rs index d8736737..e976f19e 100644 --- a/core/src/kvs/tikv/mod.rs +++ b/core/src/kvs/tikv/mod.rs @@ -370,10 +370,20 @@ impl super::api::Transaction for Transaction { /// Retrieve a range of keys from the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { + // TiKV does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/tr.rs b/core/src/kvs/tr.rs index 5eb5c6f7..6fcb86ba 100644 --- a/core/src/kvs/tr.rs +++ b/core/src/kvs/tr.rs @@ -326,13 +326,18 @@ impl Transactor { /// /// This function fetches the full range of key-value pairs, in a single request to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] - pub async fn scan(&mut self, rng: Range, limit: u32) -> Result, Error> + pub async fn scan( + &mut self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Debug, { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); - expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit).await }) + expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await }) } /// Retrieve a batched scan over a specific range of keys in the datastore. diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 5d2ddd15..c4723922 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -233,11 +233,16 @@ impl Transaction { /// /// This function fetches the full range of key-value pairs, in a single request to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] - pub async fn scan(&self, rng: Range, limit: u32) -> Result, Error> + pub async fn scan( + &self, + rng: Range, + limit: u32, + version: Option, + ) -> Result, Error> where K: Into + Debug, { - self.lock().await.scan(rng, limit).await + self.lock().await.scan(rng, limit, version).await } /// Retrieve a batched scan over a specific range of keys in the datastore. @@ -255,7 +260,11 @@ impl Transaction { /// /// This function fetches the key-value pairs in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] - pub fn stream(&self, rng: Range) -> impl Stream> + '_ + pub fn stream( + &self, + rng: Range, + version: Option, + ) -> impl Stream> + '_ where K: Into + Debug, { @@ -266,6 +275,7 @@ impl Transaction { start: rng.start.into(), end: rng.end.into(), }, + version, ) } diff --git a/core/src/sql/statements/select.rs b/core/src/sql/statements/select.rs index 9992bd5c..2d6800be 100644 --- a/core/src/sql/statements/select.rs +++ b/core/src/sql/statements/select.rs @@ -69,8 +69,9 @@ impl SelectStatement { opt.valid_for_db()?; // Create a new iterator let mut i = Iterator::new(); - // Ensure futures are stored - let opt = &opt.new_with_futures(false).with_projections(true); + // Ensure futures are stored and the version is set if specified + let version = self.version.as_ref().map(|v| v.to_u64()); + let opt = &opt.new_with_futures(false).with_projections(true).with_version(version); // Get a query planner let mut planner = QueryPlanner::new(opt, &self.with, &self.cond); // Used for ONLY: is the limit 1? diff --git a/core/src/sql/version.rs b/core/src/sql/version.rs index 823697e1..5995cd7d 100644 --- a/core/src/sql/version.rs +++ b/core/src/sql/version.rs @@ -9,6 +9,13 @@ use std::fmt; #[non_exhaustive] pub struct Version(pub Datetime); +impl Version { + /// Convert to nanosecond timestamp. + pub fn to_u64(&self) -> u64 { + self.0.timestamp_nanos_opt().unwrap_or_default() as u64 + } +} + impl fmt::Display for Version { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "VERSION {}", self.0) diff --git a/lib/tests/api.rs b/lib/tests/api.rs index fa696086..d4ba393e 100644 --- a/lib/tests/api.rs +++ b/lib/tests/api.rs @@ -434,6 +434,42 @@ mod api_integration { tokio::fs::remove_dir_all(path).await.unwrap(); } + #[test_log::test(tokio::test)] + async fn select_with_version() { + let (permit, db) = new_db().await; + db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap(); + drop(permit); + + // Create the initial version and record its timestamp. + let _ = + db.query("CREATE user:john SET name = 'John v1'").await.unwrap().check().unwrap(); + let create_ts = chrono::Utc::now(); + + // Create a new version by updating the record. + let _ = + db.query("UPDATE user:john SET name = 'John v2'").await.unwrap().check().unwrap(); + + // Without VERSION, SELECT should return the latest update. + let mut response = db.query("SELECT * FROM user").await.unwrap().check().unwrap(); + let Some(name): Option = response.take("name").unwrap() else { + panic!("query returned no record"); + }; + assert_eq!(name, "John v2"); + + // SELECT with VERSION of `create_ts` should return the initial record. + let version = create_ts.to_rfc3339(); + let mut response = db + .query(format!("SELECT * FROM user VERSION d'{}'", version)) + .await + .unwrap() + .check() + .unwrap(); + let Some(name): Option = response.take("name").unwrap() else { + panic!("query returned no record"); + }; + assert_eq!(name, "John v1"); + } + include!("api/mod.rs"); include!("api/live.rs"); include!("api/backup.rs");