Insert with version (#4583)

This commit is contained in:
Sergii Glushchenko 2024-08-23 00:34:33 +02:00 committed by GitHub
parent da308cc8c6
commit f2e598379b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 190 additions and 88 deletions

View file

@ -59,16 +59,16 @@ impl Document {
let (ref o, ref i) = (Dir::Out, Dir::In);
// Store the left pointer edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &l.tb, &l.id, o, rid);
txn.set(key, vec![]).await?;
txn.set(key, vec![], None).await?;
// Store the left inner edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, i, l);
txn.set(key, vec![]).await?;
txn.set(key, vec![], None).await?;
// Store the right inner edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, o, r);
txn.set(key, vec![]).await?;
txn.set(key, vec![], None).await?;
// Store the right pointer edge
let key = crate::key::graph::new(opt.ns()?, opt.db()?, &r.tb, &r.id, i, rid);
txn.set(key, vec![]).await?;
txn.set(key, vec![], None).await?;
// Store the edges on the record
self.current.doc.to_mut().put(&*EDGE, Value::Bool(true));
self.current.doc.to_mut().put(&*IN, l.clone().into());

View file

@ -38,8 +38,10 @@ impl Document {
// Record creation worked fine
Ok(v) => Ok(v),
},
// INSERT can be versioned
Statement::Insert(_) => txn.set(key, self, opt.version).await,
// This is not a CREATE statement, so update the key
_ => txn.set(key, self).await,
_ => txn.set(key, self, None).await,
}?;
// Carry on
Ok(())

View file

@ -124,7 +124,7 @@ mod tests {
async fn finish(txn: Transaction, mut d: U32) -> Result<(), Error> {
if let Some((key, val)) = d.finish() {
txn.set(key, val).await?;
txn.set(key, val, None).await?;
}
txn.commit().await
}

View file

@ -92,7 +92,7 @@ impl DocIds {
}
}
let doc_id = self.get_next_doc_id();
tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?;
tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone(), None).await?;
self.btree.insert(tx, &mut self.store, doc_key, doc_id).await?;
Ok(Resolved::New(doc_id))
}
@ -142,7 +142,7 @@ impl DocIds {
available_ids: self.available_ids.take(),
next_doc_id: self.next_doc_id,
};
tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())

View file

@ -87,7 +87,7 @@ impl DocLengths {
pub(super) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> {
if let Some(new_cache) = self.store.finish(tx).await? {
let state = self.btree.inc_generation();
tx.set(self.state_key.clone(), VersionedStore::try_into(state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(state)?, None).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())

View file

@ -334,7 +334,7 @@ impl FtIndex {
// Stores the term list for this doc_id
let mut val = Vec::new();
terms_ids.serialize_into(&mut val)?;
tx.set(term_ids_key, val).await?;
tx.set(term_ids_key, val, None).await?;
// Update the index state
self.state.total_docs_lengths += doc_length as u128;
@ -343,7 +343,7 @@ impl FtIndex {
}
// Update the states
tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?, None).await?;
drop(tx);
Ok(())
}

View file

@ -26,7 +26,7 @@ impl Offsets {
) -> Result<(), Error> {
let key = self.index_key_base.new_bo_key(doc_id, term_id);
let val: Val = offsets.try_into()?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
Ok(())
}

View file

@ -87,7 +87,7 @@ impl Postings {
pub(super) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> {
if let Some(new_cache) = self.store.finish(tx).await? {
let state = self.btree.inc_generation();
tx.set(self.state_key.clone(), VersionedStore::try_into(state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(state)?, None).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())

View file

@ -31,7 +31,7 @@ impl TermDocs {
let key = self.index_key_base.new_bc_key(term_id);
let mut val = Vec::new();
docs.serialize_into(&mut val)?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
}
Ok(())
}
@ -65,7 +65,7 @@ impl TermDocs {
} else {
let mut val = Vec::new();
docs.serialize_into(&mut val)?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
}
}
Ok(docs.len())

View file

@ -84,7 +84,7 @@ impl Terms {
}
}
let term_id = self.get_next_term_id();
tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?;
tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone(), None).await?;
self.btree.insert(tx, &mut self.store, term_key, term_id).await?;
Ok(term_id)
}
@ -129,7 +129,7 @@ impl Terms {
available_ids: self.available_ids.take(),
next_term_id: self.next_term_id,
};
tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?;
self.ixs.advance_store_btree_fst(new_cache);
}
Ok(())

View file

@ -59,9 +59,9 @@ impl HnswDocs {
Ok(doc_id)
} else {
let doc_id = self.next_doc_id();
tx.set(id_key, doc_id.to_be_bytes()).await?;
tx.set(id_key, doc_id.to_be_bytes(), None).await?;
let doc_key = self.ikb.new_hd_key(Some(doc_id));
tx.set(doc_key, id).await?;
tx.set(doc_key, id, None).await?;
Ok(doc_id)
}
}
@ -112,7 +112,7 @@ impl HnswDocs {
pub(in crate::idx) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> {
if self.state_updated {
tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?, None).await?;
self.state_updated = true;
}
Ok(())
@ -183,7 +183,7 @@ impl VecDocs {
}
} {
let val: Val = VersionedStore::try_into(&ed)?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
}
Ok(())
}
@ -205,7 +205,7 @@ impl VecDocs {
} else {
ed.docs = new_docs;
let val: Val = VersionedStore::try_into(&ed)?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
}
}
};

View file

@ -55,7 +55,7 @@ impl HnswElements {
) -> Result<SharedVector, Error> {
let key = self.ikb.new_he_key(id);
let val = VersionedStore::try_into(ser_vec)?;
tx.set(key, val).await?;
tx.set(key, val, None).await?;
let pt: SharedVector = vec.into();
self.elements.insert(id, pt.clone());
Ok(pt)

View file

@ -372,7 +372,7 @@ where
let old_chunks_len = mem::replace(&mut st.chunks, chunks.len() as u32);
for (i, chunk) in chunks.enumerate() {
let key = self.ikb.new_hl_key(self.level, i as u32);
tx.set(key, chunk).await?;
tx.set(key, chunk, None).await?;
}
// Delete larger chunks if they exists
for i in st.chunks..old_chunks_len {

View file

@ -273,7 +273,7 @@ where
async fn save_state(&self, tx: &Transaction) -> Result<(), Error> {
let val: Val = VersionedStore::try_into(&self.state)?;
tx.set(self.state_key.clone(), val).await?;
tx.set(self.state_key.clone(), val, None).await?;
Ok(())
}

View file

@ -175,7 +175,7 @@ impl MTreeIndex {
let mut mtree = self.mtree.write().await;
if let Some(new_cache) = self.store.finish(tx).await? {
mtree.state.generation += 1;
tx.set(self.state_key.clone(), VersionedStore::try_into(&mtree.state)?).await?;
tx.set(self.state_key.clone(), VersionedStore::try_into(&mtree.state)?, None).await?;
self.ixs.advance_store_mtree(new_cache);
}
drop(mtree);

View file

@ -161,7 +161,7 @@ impl TreeNodeProvider {
{
let val = node.n.try_into_val()?;
node.size = val.len() as u32;
tx.set(node.key.clone(), val).await?;
tx.set(node.key.clone(), val, None).await?;
Ok(())
}
}

View file

@ -60,7 +60,7 @@ pub trait Transaction {
K: Into<Key> + Sprintable + Debug;
/// Insert or update a key in the datastore.
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug;
@ -307,7 +307,7 @@ pub trait Transaction {
// Convert the timestamp to a versionstamp
let verbytes = crate::vs::u64_to_versionstamp(ver);
// Store the timestamp to prevent other transactions from committing
self.set(key.as_slice(), verbytes.to_vec()).await?;
self.set(key.as_slice(), verbytes.to_vec(), None).await?;
// Return the uint64 representation of the timestamp as the result
Ok(verbytes)
}
@ -338,6 +338,6 @@ pub trait Transaction {
let mut k: Vec<u8> = prefix.into();
k.append(&mut ts.to_vec());
k.append(&mut suffix.into());
self.set(k, val).await
self.set(k, val, None).await
}
}

View file

@ -261,11 +261,16 @@ impl super::api::Transaction for Transaction {
/// Inserts or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// FDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -174,11 +174,16 @@ impl super::api::Transaction for Transaction {
/// Insert or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// IndexDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -171,11 +171,16 @@ impl super::api::Transaction for Transaction {
/// Insert or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// MemDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -51,7 +51,7 @@ impl Datastore {
let key = crate::key::root::nd::new(id);
let now = self.clock_now().await;
let val = Node::new(id, now, false);
run!(txn, txn.set(key, val))
run!(txn, txn.set(key, val, None))
}
/// Deletes a node from the cluster.
@ -70,7 +70,7 @@ impl Datastore {
let key = crate::key::root::nd::new(id);
let val = txn.get_node(id).await?;
let val = val.as_ref().archive();
run!(txn, txn.set(key, val))
run!(txn, txn.set(key, val, None))
}
/// Expires nodes which have timedout from the cluster.
@ -100,7 +100,7 @@ impl Datastore {
// Get the key for the node entry
let key = crate::key::root::nd::new(nd.id);
// Update the node entry
catch!(txn, txn.set(key, val));
catch!(txn, txn.set(key, val, None));
}
}
}

View file

@ -255,11 +255,16 @@ impl super::api::Transaction for Transaction {
/// Insert or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// RocksDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -180,7 +180,7 @@ impl super::api::Transaction for Transaction {
/// Insert or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
@ -194,7 +194,10 @@ impl super::api::Transaction for Transaction {
return Err(Error::TxReadonly);
}
// Set the key
self.inner.set(&key.into(), &val.into())?;
match version {
Some(ts) => self.inner.set_at_ts(&key.into(), &val.into(), ts)?,
None => self.inner.set(&key.into(), &val.into())?,
}
// Return result
Ok(())
}

View file

@ -7,7 +7,7 @@ async fn multireader() {
let (ds, _) = new_ds(node_id, clock).await;
// Insert an initial key
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "some text").await.unwrap();
tx.set("test", "some text", None).await.unwrap();
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner();

View file

@ -7,17 +7,17 @@ async fn multiwriter_different_keys() {
let (ds, _) = new_ds(node_id, clock).await;
// Insert an initial key
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "some text").await.unwrap();
tx.set("test", "some text", None).await.unwrap();
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx1.set("test1", "other text 1").await.unwrap();
tx1.set("test1", "other text 1", None).await.unwrap();
// Create a writeable transaction
let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx2.set("test2", "other text 2").await.unwrap();
tx2.set("test2", "other text 2", None).await.unwrap();
// Create a writeable transaction
let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx3.set("test3", "other text 3").await.unwrap();
tx3.set("test3", "other text 3", None).await.unwrap();
// Cancel both writeable transactions
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();

View file

@ -7,17 +7,17 @@ async fn multiwriter_same_keys_allow() {
let (ds, _) = new_ds(node_id, clock).await;
// Insert an initial key
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "some text").await.unwrap();
tx.set("test", "some text", None).await.unwrap();
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx1.set("test", "other text 1").await.unwrap();
tx1.set("test", "other text 1", None).await.unwrap();
// Create a writeable transaction
let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx2.set("test", "other text 2").await.unwrap();
tx2.set("test", "other text 2", None).await.unwrap();
// Create a writeable transaction
let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx3.set("test", "other text 3").await.unwrap();
tx3.set("test", "other text 3", None).await.unwrap();
// Cancel both writeable transactions
assert!(tx1.commit().await.is_ok());
assert!(tx2.commit().await.is_ok());
@ -29,7 +29,7 @@ async fn multiwriter_same_keys_allow() {
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "original text").await.unwrap();
tx.set("test", "original text", None).await.unwrap();
tx.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();

View file

@ -7,17 +7,17 @@ async fn multiwriter_same_keys_conflict() {
let (ds, _) = new_ds(node_id, clock).await;
// Insert an initial key
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "some text").await.unwrap();
tx.set("test", "some text", None).await.unwrap();
tx.commit().await.unwrap();
// Create a writeable transaction
let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx1.set("test", "other text 1").await.unwrap();
tx1.set("test", "other text 1", None).await.unwrap();
// Create a writeable transaction
let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx2.set("test", "other text 2").await.unwrap();
tx2.set("test", "other text 2", None).await.unwrap();
// Create a writeable transaction
let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx3.set("test", "other text 3").await.unwrap();
tx3.set("test", "other text 3", None).await.unwrap();
// Cancel both writeable transactions
assert!(tx1.commit().await.is_ok());
assert!(tx2.commit().await.is_err());
@ -29,7 +29,7 @@ async fn multiwriter_same_keys_conflict() {
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "original text").await.unwrap();
tx.set("test", "original text", None).await.unwrap();
tx.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();

View file

@ -55,7 +55,7 @@ async fn set() {
let (ds, _) = new_ds(node_id, clock).await;
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.set("test", "one").await.is_ok());
assert!(tx.set("test", "one", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -64,7 +64,7 @@ async fn set() {
tx.cancel().await.unwrap();
// Create a writeable transaction
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
assert!(tx.set("test", "two").await.is_ok());
assert!(tx.set("test", "two", None).await.is_ok());
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();

View file

@ -7,7 +7,7 @@ async fn snapshot() {
let (ds, _) = new_ds(node_id, clock).await;
// Insert an initial key
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
tx.set("test", "some text").await.unwrap();
tx.set("test", "some text", None).await.unwrap();
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner();
@ -17,7 +17,7 @@ async fn snapshot() {
// Create a new writeable transaction
let mut txw = ds.transaction(Write, Optimistic).await.unwrap().inner();
// Update the test key content
txw.set("test", "other text").await.unwrap();
txw.set("test", "other text", None).await.unwrap();
// Create a readonly transaction
let mut tx2 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx2.get("test", None).await.unwrap().unwrap();
@ -27,7 +27,7 @@ async fn snapshot() {
let val = tx3.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Update the test key content
txw.set("test", "extra text").await.unwrap();
txw.set("test", "extra text", None).await.unwrap();
// Check the key from the original transaction
let val = tx1.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");

View file

@ -205,11 +205,16 @@ impl super::api::Transaction for Transaction {
/// Insert or update a key in the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// TiKV does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);
@ -437,7 +442,7 @@ impl super::api::Transaction for Transaction {
// Convert the timestamp to a versionstamp
let verbytes = crate::vs::u64_to_versionstamp(ver);
// Store the timestamp to prevent other transactions from committing
self.set(key.as_slice(), verbytes.to_vec()).await?;
self.set(key.as_slice(), verbytes.to_vec(), None).await?;
// Return the uint64 representation of the timestamp as the result
Ok(verbytes)
}

View file

@ -247,13 +247,13 @@ impl Transactor {
/// Insert or update a key in the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn set<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
pub async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
let key = key.into();
expand_inner!(&mut self.inner, v => { v.set(key, val).await })
expand_inner!(&mut self.inner, v => { v.set(key, val, version).await })
}
/// Insert a key if it doesn't exist in the datastore.
@ -468,7 +468,7 @@ impl Transactor {
let nid = seq.get_next_id();
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(nid)
}
@ -479,7 +479,7 @@ impl Transactor {
let nid = seq.get_next_id();
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(nid)
}
@ -490,7 +490,7 @@ impl Transactor {
let nid = seq.get_next_id();
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(nid)
}
@ -502,7 +502,7 @@ impl Transactor {
seq.remove_id(ns);
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(())
}
@ -514,7 +514,7 @@ impl Transactor {
seq.remove_id(db);
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(())
}
@ -526,7 +526,7 @@ impl Transactor {
seq.remove_id(tb);
self.stash.set(key, seq.clone());
let (k, v) = seq.finish().unwrap();
self.set(k, v).await?;
self.set(k, v, None).await?;
Ok(())
}
@ -599,7 +599,7 @@ impl Transactor {
));
}
}
self.set(ts_key, vst).await?;
self.set(ts_key, vst, None).await?;
Ok(vst)
}

View file

@ -191,12 +191,12 @@ impl Transaction {
/// Insert or update a key in the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn set<K, V>(&self, key: K, val: V) -> Result<(), Error>
pub async fn set<K, V>(&self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: Into<Key> + Debug,
V: Into<Val> + Debug,
{
self.lock().await.set(key, val).await
self.lock().await.set(key, val, version).await
}
/// Insert a key if it doesn't exist in the datastore.
@ -1310,7 +1310,7 @@ impl Transaction {
let key = crate::key::thing::new(ns, db, tb, id);
let enc = crate::key::thing::new(ns, db, tb, id).encode()?;
// Set the value in the datastore
self.set(&key, &val).await?;
self.set(&key, &val, None).await?;
// Set the value in the cache
self.cache.insert(enc, Entry::Val(Arc::new(val)));
// Return nothing

View file

@ -293,7 +293,7 @@ async fn compute_grant(
let gr_str = gr.id.to_raw();
// Process the statement
let key = crate::key::root::access::gr::new(&ac_str, &gr_str);
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.into()))
}
_ => Err(Error::AccessMethodMismatch),
@ -350,7 +350,7 @@ async fn compute_grant(
// Process the statement
let key = crate::key::namespace::access::gr::new(opt.ns()?, &ac_str, &gr_str);
txn.get_or_add_ns(opt.ns()?, opt.strict).await?;
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.into()))
}
_ => Err(Error::AccessMethodMismatch),
@ -418,7 +418,7 @@ async fn compute_grant(
);
txn.get_or_add_ns(opt.ns()?, opt.strict).await?;
txn.get_or_add_db(opt.ns()?, opt.db()?, opt.strict).await?;
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.into()))
}
}
@ -523,7 +523,7 @@ async fn compute_revoke(
gr.revocation = Some(Datetime::default());
// Process the statement
let key = crate::key::root::access::gr::new(&ac_str, &gr_str);
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.redacted().into()))
}
Base::Ns => {
@ -544,7 +544,7 @@ async fn compute_revoke(
// Process the statement
let key = crate::key::namespace::access::gr::new(opt.ns()?, &ac_str, &gr_str);
txn.get_or_add_ns(opt.ns()?, opt.strict).await?;
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.redacted().into()))
}
Base::Db => {
@ -567,7 +567,7 @@ async fn compute_revoke(
let key = crate::key::database::access::gr::new(opt.ns()?, opt.db()?, &ac_str, &gr_str);
txn.get_or_add_ns(opt.ns()?, opt.strict).await?;
txn.get_or_add_db(opt.ns()?, opt.db()?, opt.strict).await?;
txn.set(key, &gr).await?;
txn.set(key, &gr, None).await?;
Ok(Value::Object(gr.redacted().into()))
}
_ => Err(Error::Unimplemented(

View file

@ -69,7 +69,7 @@ impl AlterTableStatement {
dt.kind = kind.clone();
}
txn.set(key, &dt).await?;
txn.set(key, &dt, None).await?;
// Add table relational fields
if matches!(self.kind, Some(TableType::Relation(_))) {
dt.add_in_out_fields(&txn, opt).await?;

View file

@ -92,6 +92,7 @@ impl DefineAccessStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache
@ -124,6 +125,7 @@ impl DefineAccessStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache
@ -158,6 +160,7 @@ impl DefineAccessStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -60,6 +60,7 @@ impl DefineAnalyzerStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -63,6 +63,7 @@ impl DefineDatabaseStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -61,6 +61,7 @@ impl DefineEventStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -76,6 +76,7 @@ impl DefineFieldStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
@ -113,7 +114,7 @@ impl DefineFieldStatement {
}
};
txn.set(key, statement).await?;
txn.set(key, statement, None).await?;
if let Some(new_kind) = new_kind {
cur_kind = new_kind;
@ -146,7 +147,7 @@ impl DefineFieldStatement {
}),
..tb.as_ref().to_owned()
};
txn.set(key, val).await?;
txn.set(key, val, None).await?;
}
}
}
@ -175,7 +176,7 @@ impl DefineFieldStatement {
}),
..tb.as_ref().to_owned()
};
txn.set(key, val).await?;
txn.set(key, val, None).await?;
}
}
}

View file

@ -63,6 +63,7 @@ impl DefineFunctionStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -87,6 +87,7 @@ impl DefineIndexStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -61,6 +61,7 @@ impl DefineModelStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -61,6 +61,7 @@ impl DefineNamespaceStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -64,6 +64,7 @@ impl DefineParamStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -78,7 +78,7 @@ impl DefineTableStatement {
overwrite: false,
..self.clone()
};
txn.set(key, &dt).await?;
txn.set(key, &dt, None).await?;
// Add table relational fields
self.add_in_out_fields(&txn, opt).await?;
// Clear the cache
@ -96,7 +96,7 @@ impl DefineTableStatement {
for v in view.what.0.iter() {
// Save the view config
let key = crate::key::table::ft::new(opt.ns()?, opt.db()?, v, &self.name);
txn.set(key, self).await?;
txn.set(key, self, None).await?;
}
// Force queries to run
let opt = &opt.new_with_force(Force::Table(Arc::new([dt])));
@ -147,6 +147,7 @@ impl DefineTableStatement {
kind: Some(val),
..Default::default()
},
None,
)
.await?;
}
@ -162,6 +163,7 @@ impl DefineTableStatement {
kind: Some(val),
..Default::default()
},
None,
)
.await?;
}

View file

@ -133,6 +133,7 @@ impl DefineUserStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache
@ -165,6 +166,7 @@ impl DefineUserStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache
@ -199,6 +201,7 @@ impl DefineUserStatement {
overwrite: false,
..self.clone()
},
None,
)
.await?;
// Clear the cache

View file

@ -4,14 +4,14 @@ use crate::doc::CursorDoc;
use crate::err::Error;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::{Data, Id, Output, Table, Thing, Timeout, Value};
use crate::sql::{Data, Id, Output, Table, Thing, Timeout, Value, Version};
use derive::Store;
use reblessive::tree::Stk;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[revisioned(revision = 2)]
#[revisioned(revision = 3)]
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
@ -25,6 +25,8 @@ pub struct InsertStatement {
pub parallel: bool,
#[revision(start = 2)]
pub relation: bool,
#[revision(start = 3)]
pub version: Option<Version>,
}
impl InsertStatement {
@ -44,8 +46,10 @@ impl InsertStatement {
opt.valid_for_db()?;
// Create a new iterator
let mut i = Iterator::new();
// Propagate the version to the underlying datastore
let version = self.version.as_ref().map(|v| v.to_u64());
// Ensure futures are stored
let opt = &opt.new_with_futures(false).with_projections(false);
let opt = &opt.new_with_futures(false).with_projections(false).with_version(version);
// Parse the INTO expression
let into = match &self.into {
None => None,
@ -129,6 +133,9 @@ impl fmt::Display for InsertStatement {
if let Some(ref v) = self.output {
write!(f, " {v}")?
}
if let Some(ref v) = self.version {
write!(f, " {v}")?
}
if let Some(ref v) = self.timeout {
write!(f, " {v}")?
}

View file

@ -75,6 +75,7 @@ impl Parser<'_> {
None
};
let output = self.try_parse_output(ctx).await?;
let version = self.try_parse_version()?;
let timeout = self.try_parse_timeout()?;
let parallel = self.eat(t!("PARALLEL"));
Ok(InsertStatement {
@ -86,6 +87,7 @@ impl Parser<'_> {
timeout,
parallel,
relation,
version,
})
}

View file

@ -2084,6 +2084,7 @@ fn parse_insert() {
),
])),
output: Some(Output::After),
version: None,
timeout: None,
parallel: false,
relation: false,

View file

@ -608,6 +608,7 @@ fn statements() -> Vec<Statement> {
),
])),
output: Some(Output::After),
version: None,
timeout: None,
parallel: false,
relation: false,

View file

@ -537,6 +537,50 @@ mod api_integration {
assert!(response.is_none());
}
#[test_log::test(tokio::test)]
async fn insert_with_version() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
// Create a record in the past.
let _ = db
.query("INSERT INTO user { id: user:john, name: 'John' } VERSION d'2024-08-19T08:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
// Without VERSION, SELECT should return the record.
let mut response = db.query("SELECT * FROM user:john").await.unwrap().check().unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John");
// SELECT with the VERSION set to the creation timestamp or later should return the record.
let mut response = db
.query("SELECT * FROM user:john VERSION d'2024-08-19T08:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John");
// SELECT with the VERSION set before the creation timestamp should return nothing.
let mut response = db
.query("SELECT * FROM user:john VERSION d'2024-08-19T07:00:00Z'")
.await
.unwrap()
.check()
.unwrap();
let response: Option<String> = response.take("name").unwrap();
assert!(response.is_none());
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");