Experimental: select all from table with version (#4494)

This commit is contained in:
Sergii Glushchenko 2024-08-13 17:35:30 +02:00 committed by GitHub
parent a8b96936f4
commit a87433c4d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 193 additions and 38 deletions

4
Cargo.lock generated
View file

@ -6045,9 +6045,9 @@ dependencies = [
[[package]]
name = "surrealkv"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cbebbb653d3a28eb67d371a1f2d09954e09834ef51cd6bab79d7958c95ca81"
checksum = "079b1114b1bbe22e2738f0c6fccc0109d06be9be7f48d2e38d143baf935358e9"
dependencies = [
"ahash 0.8.11",
"async-channel 2.2.0",

View file

@ -121,7 +121,14 @@ reqwest = { version = "0.12.5", default-features = false, features = [
"stream",
"multipart",
], optional = true }
revision = { version = "0.8.0", features = ["chrono", "geo", "roaring", "regex", "rust_decimal", "uuid"] }
revision = { version = "0.8.0", features = [
"chrono",
"geo",
"roaring",
"regex",
"rust_decimal",
"uuid",
] }
rmpv = "1.0.1"
roaring = { version = "0.10.2", features = ["serde"] }
rocksdb = { version = "0.21.0", features = ["lz4", "snappy"], optional = true }
@ -135,7 +142,7 @@ sha1 = "0.10.6"
sha2 = "0.10.8"
snap = "1.1.0"
storekey = "0.5.0"
surrealkv = { version = "0.3.1", optional = true }
surrealkv = { version = "0.3.2", optional = true }
surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" }
tempfile = { version = "3.10.1", optional = true }
thiserror = "1.0.50"

View file

@ -49,7 +49,7 @@ pub async fn read(
// Create an empty buffer for the final changesets
let mut res = Vec::<ChangeSet>::new();
// iterate over _x and put decoded elements to r
for (k, v) in tx.scan(beg..end, limit).await? {
for (k, v) in tx.scan(beg..end, limit, None).await? {
#[cfg(debug_assertions)]
trace!("Reading change feed entry: {}", k.sprint());
// Decode the changefeed entry key

View file

@ -46,6 +46,8 @@ pub struct Options {
pub projections: bool,
/// The channel over which we send notifications
pub sender: Option<Sender<Notification>>,
/// Version as nanosecond timestamp passed down to Datastore
pub version: Option<u64>,
}
#[derive(Clone, Debug)]
@ -81,6 +83,7 @@ impl Options {
auth_enabled: true,
sender: None,
auth: Arc::new(Auth::default()),
version: None,
}
}
@ -202,6 +205,12 @@ impl Options {
self
}
// Set the version
pub fn with_version(mut self, version: Option<u64>) -> Self {
self.version = version;
self
}
// --------------------------------------------------
/// Create a new Options object for a subquery

View file

@ -306,7 +306,7 @@ impl<'a> Processor<'a> {
let beg = thing::prefix(opt.ns()?, opt.db()?, v);
let end = thing::suffix(opt.ns()?, opt.db()?, v);
// Create a new iterable range
let mut stream = txn.stream(beg..end);
let mut stream = txn.stream(beg..end, opt.version);
// Loop until no more entries
while let Some(res) = stream.next().await {
// Check if the context is finished
@ -365,7 +365,7 @@ impl<'a> Processor<'a> {
}
};
// Create a new iterable range
let mut stream = txn.stream(beg..end);
let mut stream = txn.stream(beg..end, None);
// Loop until no more entries
while let Some(res) = stream.next().await {
// Check if the context is finished
@ -474,7 +474,7 @@ impl<'a> Processor<'a> {
// Loop over the chosen edge types
for (beg, end) in keys.into_iter() {
// Create a new iterable range
let mut stream = txn.stream(beg..end);
let mut stream = txn.stream(beg..end, None);
// Loop until no more entries
while let Some(res) = stream.next().await {
// Check if the context is finished

View file

@ -1019,6 +1019,10 @@ pub enum Error {
UnsupportedDestructure {
variant: String,
},
#[doc(hidden)]
#[error("The underlying datastore does not support versioned queries")]
UnsupportedVersionedQueries,
}
impl From<Error> for String {

View file

@ -172,7 +172,7 @@ impl IndexEqualThingIterator {
) -> Result<B, Error> {
let min = beg.clone();
let max = end.to_owned();
let res = tx.scan(min..max, limit).await?;
let res = tx.scan(min..max, limit, None).await?;
if let Some((key, _)) = res.last() {
let mut key = key.clone();
key.push(0x00);
@ -302,7 +302,7 @@ impl IndexRangeThingIterator {
) -> Result<B, Error> {
let min = self.r.beg.clone();
let max = self.r.end.clone();
let res = tx.scan(min..max, limit).await?;
let res = tx.scan(min..max, limit, None).await?;
if let Some((key, _)) = res.last() {
self.r.beg.clone_from(key);
self.r.beg.push(0x00);
@ -600,7 +600,7 @@ impl UniqueRangeThingIterator {
let min = self.r.beg.clone();
let max = self.r.end.clone();
limit += 1;
let res = tx.scan(min..max, limit).await?;
let res = tx.scan(min..max, limit, None).await?;
let mut records = B::with_capacity(res.len());
for (k, v) in res {
limit -= 1;

View file

@ -1432,7 +1432,7 @@ mod tests {
assert_eq!(s.max_depth, 3);
assert_eq!(s.nodes_count, 10);
// There should be one record per node
assert_eq!(10, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len());
assert_eq!(10, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
let nodes_count = t
.inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count {
@ -1571,7 +1571,7 @@ mod tests {
assert_eq!(s.max_depth, 2);
assert_eq!(s.nodes_count, 7);
// There should be one record per node
assert_eq!(7, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len());
assert_eq!(7, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
let nodes_count = t
.inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count {
@ -1695,7 +1695,7 @@ mod tests {
assert_eq!(s.max_depth, 0);
assert_eq!(s.nodes_count, 0);
// There should not be any record in the database
assert_eq!(0, tx.scan(vec![]..vec![0xf], 100).await.unwrap().len());
assert_eq!(0, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
tx.cancel().await?;
Ok(())
}

View file

@ -97,7 +97,12 @@ pub trait Transaction {
/// Retrieve a specific range of keys from the datastore.
///
/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug;
@ -239,7 +244,7 @@ pub trait Transaction {
let end: Key = rng.end.into();
// Scan for the next batch
let res = if values {
self.scan(beg..end.clone(), batch).await?
self.scan(beg..end.clone(), batch, None).await?
} else {
self.keys(beg..end.clone(), batch)
.await?

View file

@ -443,10 +443,20 @@ impl super::api::Transaction for Transaction {
/// Retrieve a range of keys from the databases
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// FDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -294,10 +294,20 @@ impl super::api::Transaction for Transaction {
/// Retrieve a range of keys from the databases
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// IndexDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -291,10 +291,20 @@ impl super::api::Transaction for Transaction {
/// Retrieve a range of keys from the databases
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// MemDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -430,10 +430,20 @@ impl super::api::Transaction for Transaction {
/// Retrieve a range of keys from the databases
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// RocksDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -26,10 +26,17 @@ pub(super) struct Scanner<'a> {
future: Option<Pin<Box<dyn Future<Output = Output> + 'a>>>,
/// Whether this stream should try to fetch more
exhausted: bool,
/// Version as timestamp, 0 means latest.
version: Option<u64>,
}
impl<'a> Scanner<'a> {
pub fn new(store: &'a Transaction, batch: u32, range: Range<Key>) -> Self {
pub fn new(
store: &'a Transaction,
batch: u32,
range: Range<Key>,
version: Option<u64>,
) -> Self {
Scanner {
store,
batch,
@ -37,6 +44,7 @@ impl<'a> Scanner<'a> {
future: None,
results: VecDeque::new(),
exhausted: false,
version,
}
}
}
@ -62,7 +70,7 @@ impl<'a> Stream for Scanner<'a> {
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.scan(range, num)));
self.future = Some(Box::pin(self.store.scan(range, num, self.version)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {

View file

@ -321,7 +321,12 @@ impl super::api::Transaction for Transaction {
/// Retrieves a range of key-value pairs from the database.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
@ -332,11 +337,19 @@ impl super::api::Transaction for Transaction {
// Set the key range
let beg = rng.start.into();
let end = rng.end.into();
let range = beg.as_slice()..end.as_slice();
// Retrieve the scan range
let res = self.inner.scan(beg.as_slice()..end.as_slice(), Some(limit as usize))?;
// Convert the keys and values
let res = res.into_iter().map(|kv| (Key::from(kv.0), kv.1)).collect();
// Return result
let res = match version {
Some(ts) => self.inner.scan_at_ts(range, ts, Some(limit as usize))?,
None => self
.inner
.scan(range, Some(limit as usize))?
.into_iter()
.map(|kv| (kv.0, kv.1))
.collect(),
};
Ok(res)
}
}

View file

@ -247,7 +247,7 @@ async fn scan() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.scan("test1".."test9", u32::MAX).await.unwrap();
let val = tx.scan("test1".."test9", u32::MAX, None).await.unwrap();
assert_eq!(val.len(), 5);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");
@ -262,7 +262,7 @@ async fn scan() {
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.scan("test2".."test4", u32::MAX).await.unwrap();
let val = tx.scan("test2".."test4", u32::MAX, None).await.unwrap();
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test2");
assert_eq!(val[0].1, b"2");
@ -271,7 +271,7 @@ async fn scan() {
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.scan("test1".."test9", 2).await.unwrap();
let val = tx.scan("test1".."test9", 2, None).await.unwrap();
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test1");
assert_eq!(val[0].1, b"1");

View file

@ -370,10 +370,20 @@ impl super::api::Transaction for Transaction {
/// Retrieve a range of keys from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// TiKV does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -326,13 +326,18 @@ impl Transactor {
///
/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn scan<K>(&mut self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
pub async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Debug,
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit).await })
expand_inner!(&mut self.inner, v => { v.scan(beg..end, limit, version).await })
}
/// Retrieve a batched scan over a specific range of keys in the datastore.

View file

@ -233,11 +233,16 @@ impl Transaction {
///
/// This function fetches the full range of key-value pairs, in a single request to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn scan<K>(&self, rng: Range<K>, limit: u32) -> Result<Vec<(Key, Val)>, Error>
pub async fn scan<K>(
&self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Debug,
{
self.lock().await.scan(rng, limit).await
self.lock().await.scan(rng, limit, version).await
}
/// Retrieve a batched scan over a specific range of keys in the datastore.
@ -255,7 +260,11 @@ impl Transaction {
///
/// This function fetches the key-value pairs in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub fn stream<K>(&self, rng: Range<K>) -> impl Stream<Item = Result<(Key, Val), Error>> + '_
pub fn stream<K>(
&self,
rng: Range<K>,
version: Option<u64>,
) -> impl Stream<Item = Result<(Key, Val), Error>> + '_
where
K: Into<Key> + Debug,
{
@ -266,6 +275,7 @@ impl Transaction {
start: rng.start.into(),
end: rng.end.into(),
},
version,
)
}

View file

@ -69,8 +69,9 @@ impl SelectStatement {
opt.valid_for_db()?;
// Create a new iterator
let mut i = Iterator::new();
// Ensure futures are stored
let opt = &opt.new_with_futures(false).with_projections(true);
// Ensure futures are stored and the version is set if specified
let version = self.version.as_ref().map(|v| v.to_u64());
let opt = &opt.new_with_futures(false).with_projections(true).with_version(version);
// Get a query planner
let mut planner = QueryPlanner::new(opt, &self.with, &self.cond);
// Used for ONLY: is the limit 1?

View file

@ -9,6 +9,13 @@ use std::fmt;
#[non_exhaustive]
pub struct Version(pub Datetime);
impl Version {
/// Convert to nanosecond timestamp.
pub fn to_u64(&self) -> u64 {
self.0.timestamp_nanos_opt().unwrap_or_default() as u64
}
}
impl fmt::Display for Version {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "VERSION {}", self.0)

View file

@ -434,6 +434,42 @@ mod api_integration {
tokio::fs::remove_dir_all(path).await.unwrap();
}
#[test_log::test(tokio::test)]
async fn select_with_version() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
// Create the initial version and record its timestamp.
let _ =
db.query("CREATE user:john SET name = 'John v1'").await.unwrap().check().unwrap();
let create_ts = chrono::Utc::now();
// Create a new version by updating the record.
let _ =
db.query("UPDATE user:john SET name = 'John v2'").await.unwrap().check().unwrap();
// Without VERSION, SELECT should return the latest update.
let mut response = db.query("SELECT * FROM user").await.unwrap().check().unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John v2");
// SELECT with VERSION of `create_ts` should return the initial record.
let version = create_ts.to_rfc3339();
let mut response = db
.query(format!("SELECT * FROM user VERSION d'{}'", version))
.await
.unwrap()
.check()
.unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John v1");
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");