Create with version ()

This commit is contained in:
Sergii Glushchenko 2024-08-21 15:54:58 +02:00 committed by GitHub
parent 95f9f8dacd
commit 13b6788540
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 143 additions and 56 deletions

4
Cargo.lock generated
View file

@ -6297,9 +6297,9 @@ dependencies = [
[[package]]
name = "surrealkv"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ab634895ff2c9289cb854a56335e428338ba71074c9eff02d84b360f0210155"
checksum = "7d5fe193207896e6eeb445b88b810830097a9940058174bbbe8ec5597b1f52e1"
dependencies = [
"ahash 0.8.11",
"async-channel 2.2.0",

View file

@ -143,7 +143,7 @@ sha2 = "0.10.8"
snap = "1.1.0"
storekey = "0.5.0"
subtle = "2.6"
surrealkv = { version = "0.3.3", optional = true }
surrealkv = { version = "0.3.4", optional = true }
surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" }
tempfile = { version = "3.10.1", optional = true }
thiserror = "1.0.50"

View file

@ -580,11 +580,11 @@ mod tests {
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
let ns_root = crate::key::root::ns::new(NS);
tx.put(&ns_root, dns).await.unwrap();
tx.put(&ns_root, dns, None).await.unwrap();
let db_root = crate::key::namespace::db::new(NS, DB);
tx.put(&db_root, ddb).await.unwrap();
tx.put(&db_root, ddb, None).await.unwrap();
let tb_root = crate::key::database::tb::new(NS, DB, TB);
tx.put(&tb_root, dtb.clone()).await.unwrap();
tx.put(&tb_root, dtb.clone(), None).await.unwrap();
tx.commit().await.unwrap();
ds
}

View file

@ -28,7 +28,7 @@ impl Document {
// Match the statement type
match stm {
// This is a CREATE statement so try to insert the key
Statement::Create(_) => match txn.put(key, self).await {
Statement::Create(_) => match txn.put(key, self, opt.version).await {
// The key already exists, so return an error
Err(Error::TxKeyAlreadyExists) => Err(Error::RecordExists {
thing: rid.to_string(),

View file

@ -66,7 +66,7 @@ pub trait Transaction {
V: Into<Val> + Debug;
/// Insert a key if it doesn't exist in the datastore.
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug;

View file

@ -282,11 +282,16 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// FDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -195,11 +195,16 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// IndexDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -192,11 +192,16 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// MemDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -27,7 +27,7 @@ impl Datastore {
let key = crate::key::root::nd::Nd::new(id);
let now = self.clock_now().await;
let val = Node::new(id, now, false);
match run!(txn, txn.put(key, val)) {
match run!(txn, txn.put(key, val, None)) {
Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists {
value: id.to_string(),
}),

View file

@ -276,11 +276,16 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// RocksDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -170,7 +170,7 @@ impl super::api::Transaction for Transaction {
// Fetch the value from the database.
let res = match version {
Some(ts) => Some(self.inner.get_at_ts(&key.into(), ts)?),
Some(ts) => self.inner.get_at_ts(&key.into(), ts)?,
None => self.inner.get(&key.into())?,
};
@ -201,7 +201,7 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
@ -218,10 +218,14 @@ impl super::api::Transaction for Transaction {
let key = key.into();
let val = val.into();
// Set the key if empty
match self.inner.get(&key)? {
None => self.inner.set(&key, &val)?,
_ => return Err(Error::TxKeyAlreadyExists),
};
if let Some(ts) = version {
self.inner.set_at_ts(&key, &val, ts)?;
} else {
match self.inner.get(&key)? {
None => self.inner.set(&key, &val)?,
_ => return Err(Error::TxKeyAlreadyExists),
};
}
// Return result
Ok(())
}

View file

@ -2,7 +2,7 @@
#[serial]
async fn initialise() {
let mut tx = new_tx(Write, Optimistic).await.inner();
assert!(tx.put("test", "ok").await.is_ok());
assert!(tx.put("test", "ok", None).await.is_ok());
tx.commit().await.unwrap();
}
@ -15,7 +15,7 @@ async fn exists() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "ok").await.is_ok());
assert!(tx.put("test", "ok", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -35,7 +35,7 @@ async fn get() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "ok").await.is_ok());
assert!(tx.put("test", "ok", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -82,7 +82,7 @@ async fn put() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "one").await.is_ok());
assert!(tx.put("test", "one", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -91,7 +91,7 @@ async fn put() {
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "two").await.is_err());
assert!(tx.put("test", "two", None).await.is_err());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -109,7 +109,7 @@ async fn putc() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "one").await.is_ok());
assert!(tx.put("test", "one", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -145,7 +145,7 @@ async fn del() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "one").await.is_ok());
assert!(tx.put("test", "one", None).await.is_ok());
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
@ -167,7 +167,7 @@ async fn delc() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test", "one").await.is_ok());
assert!(tx.put("test", "one", None).await.is_ok());
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
@ -198,11 +198,11 @@ async fn keys() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test1", "1").await.is_ok());
assert!(tx.put("test2", "2").await.is_ok());
assert!(tx.put("test3", "3").await.is_ok());
assert!(tx.put("test4", "4").await.is_ok());
assert!(tx.put("test5", "5").await.is_ok());
assert!(tx.put("test1", "1", None).await.is_ok());
assert!(tx.put("test2", "2", None).await.is_ok());
assert!(tx.put("test3", "3", None).await.is_ok());
assert!(tx.put("test4", "4", None).await.is_ok());
assert!(tx.put("test5", "5", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -239,11 +239,11 @@ async fn scan() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test1", "1").await.is_ok());
assert!(tx.put("test2", "2").await.is_ok());
assert!(tx.put("test3", "3").await.is_ok());
assert!(tx.put("test4", "4").await.is_ok());
assert!(tx.put("test5", "5").await.is_ok());
assert!(tx.put("test1", "1", None).await.is_ok());
assert!(tx.put("test2", "2", None).await.is_ok());
assert!(tx.put("test3", "3", None).await.is_ok());
assert!(tx.put("test4", "4", None).await.is_ok());
assert!(tx.put("test5", "5", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -289,11 +289,11 @@ async fn batch() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.put("test1", "1").await.is_ok());
assert!(tx.put("test2", "2").await.is_ok());
assert!(tx.put("test3", "3").await.is_ok());
assert!(tx.put("test4", "4").await.is_ok());
assert!(tx.put("test5", "5").await.is_ok());
assert!(tx.put("test1", "1", None).await.is_ok());
assert!(tx.put("test2", "2", None).await.is_ok());
assert!(tx.put("test3", "3", None).await.is_ok());
assert!(tx.put("test4", "4", None).await.is_ok());
assert!(tx.put("test5", "5", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();

View file

@ -226,11 +226,16 @@ impl super::api::Transaction for Transaction {
/// Insert a key if it doesn't exist in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// TiKV does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -258,13 +258,13 @@ impl Transactor {
/// Insert a key if it doesn't exist in the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn put<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
pub async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
let key = key.into();
expand_inner!(&mut self.inner, v => { v.put(key, val).await })
expand_inner!(&mut self.inner, v => { v.put(key, val, version).await })
}
/// Update a key in the datastore if the current value matches a condition.

View file

@ -201,12 +201,12 @@ impl Transaction {
/// Insert a key if it doesn't exist in the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn put<K, V>(&self, key: K, val: V) -> Result<(), Error>
pub async fn put<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
self.lock().await.put(key, val).await
self.lock().await.put(key, val, version).await
}
/// Update a key in the datastore if the current value matches a condition.
@ -1459,7 +1459,7 @@ impl Transaction {
..Default::default()
};
let val = {
self.put(&key, &val).await?;
self.put(&key, &val, None).await?;
Entry::Any(Arc::new(val))
};
let _ = cache.insert(val.clone());
@ -1517,7 +1517,7 @@ impl Transaction {
..Default::default()
};
let val = {
self.put(&key, &val).await?;
self.put(&key, &val, None).await?;
Entry::Any(Arc::new(val))
};
let _ = cache.insert(val.clone());
@ -1586,7 +1586,7 @@ impl Transaction {
..Default::default()
};
let val = {
self.put(&key, &val).await?;
self.put(&key, &val, None).await?;
Entry::Any(Arc::new(val))
};
let _ = cache.insert(val.clone());

View file

@ -2,14 +2,14 @@ use crate::ctx::Context;
use crate::dbs::{Iterator, Options, Statement};
use crate::doc::CursorDoc;
use crate::err::Error;
use crate::sql::{Data, Output, Timeout, Value, Values};
use crate::sql::{Data, Output, Timeout, Value, Values, Version};
use derive::Store;
use reblessive::tree::Stk;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[revisioned(revision = 2)]
#[revisioned(revision = 3)]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
@ -27,6 +27,9 @@ pub struct CreateStatement {
pub timeout: Option<Timeout>,
// If the statement should be run in parallel
pub parallel: bool,
// Version as nanosecond timestamp passed down to Datastore
#[revision(start = 3)]
pub version: Option<Version>,
}
impl CreateStatement {
@ -48,8 +51,10 @@ impl CreateStatement {
let mut i = Iterator::new();
// Assign the statement
let stm = Statement::from(self);
// Propagate the version to the underlying datastore
let version = self.version.as_ref().map(|v| v.to_u64());
// Ensure futures are stored
let opt = &opt.new_with_futures(false);
let opt = &opt.new_with_futures(false).with_version(version);
// Loop over the create targets
for w in self.what.0.iter() {
let v = w.compute(stk, ctx, opt, doc).await?;
@ -90,6 +95,9 @@ impl fmt::Display for CreateStatement {
if let Some(ref v) = self.output {
write!(f, " {v}")?
}
if let Some(ref v) = self.version {
write!(f, " {v}")?
}
if let Some(ref v) = self.timeout {
write!(f, " {v}")?
}

View file

@ -114,10 +114,10 @@ impl LiveStatement {
let mut txn = txn.lock().await;
// Insert the node live query
let key = crate::key::node::lq::new(nid, id);
txn.put(key, lq).await?;
txn.put(key, lq, None).await?;
// Insert the table live query
let key = crate::key::table::lq::new(ns, db, &tb, id);
txn.put(key, stm).await?;
txn.put(key, stm, None).await?;
}
v => {
return Err(Error::LiveStatement {

View file

@ -14,6 +14,7 @@ impl Parser<'_> {
let what = Values(self.parse_what_list(ctx).await?);
let data = self.try_parse_data(ctx).await?;
let output = self.try_parse_output(ctx).await?;
let version = self.try_parse_version()?;
let timeout = self.try_parse_timeout()?;
let parallel = self.eat(t!("PARALLEL"));
@ -24,6 +25,7 @@ impl Parser<'_> {
output,
timeout,
parallel,
version,
})
}
}

View file

@ -228,7 +228,7 @@ impl Parser<'_> {
Ok(Some(Start(value)))
}
fn try_parse_version(&mut self) -> ParseResult<Option<Version>> {
pub(crate) fn try_parse_version(&mut self) -> ParseResult<Option<Version>> {
if !self.eat(t!("VERSION")) {
return Ok(None);
}

View file

@ -119,6 +119,7 @@ fn parse_create() {
))),
timeout: Some(Timeout(Duration(std::time::Duration::from_secs(1)))),
parallel: true,
version: None,
}),
);
}
@ -2204,6 +2205,7 @@ fn parse_relate() {
output: None,
timeout: None,
parallel: false,
version: None,
}))),
uniq: true,
data: Some(Data::SetExpression(vec![(

View file

@ -150,6 +150,7 @@ fn statements() -> Vec<Statement> {
))),
timeout: Some(Timeout(Duration(std::time::Duration::from_secs(1)))),
parallel: true,
version: None,
}),
Statement::Define(DefineStatement::Namespace(DefineNamespaceStatement {
id: None,
@ -638,6 +639,7 @@ fn statements() -> Vec<Statement> {
output: None,
timeout: None,
parallel: false,
version: None,
}))),
uniq: true,
data: Some(Data::SetExpression(vec![(

View file

@ -493,6 +493,50 @@ mod api_integration {
assert_eq!(name, "John v1");
}
#[test_log::test(tokio::test)]
async fn create_with_version() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
// Create a record in the past.
let _ = db
.query("CREATE user:john SET name = 'John' VERSION d'2024-08-19T08:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
// Without VERSION, SELECT should return the record.
let mut response = db.query("SELECT * FROM user:john").await.unwrap().check().unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John");
// SELECT with the VERSION set to the creation timestamp or later should return the record.
let mut response = db
.query("SELECT * FROM user:john VERSION d'2024-08-19T08:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John");
// SELECT with the VERSION set before the creation timestamp should return nothing.
let mut response = db
.query("SELECT * FROM user:john VERSION d'2024-08-19T07:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
let response: Option<String> = response.take("name").unwrap();
assert!(response.is_none());
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");