diff --git a/core/src/doc/edges.rs b/core/src/doc/edges.rs index 73e49abe..365c553d 100644 --- a/core/src/doc/edges.rs +++ b/core/src/doc/edges.rs @@ -59,16 +59,16 @@ impl Document { let (ref o, ref i) = (Dir::Out, Dir::In); // Store the left pointer edge let key = crate::key::graph::new(opt.ns()?, opt.db()?, &l.tb, &l.id, o, rid); - txn.set(key, vec![]).await?; + txn.set(key, vec![], None).await?; // Store the left inner edge let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, i, l); - txn.set(key, vec![]).await?; + txn.set(key, vec![], None).await?; // Store the right inner edge let key = crate::key::graph::new(opt.ns()?, opt.db()?, &rid.tb, &rid.id, o, r); - txn.set(key, vec![]).await?; + txn.set(key, vec![], None).await?; // Store the right pointer edge let key = crate::key::graph::new(opt.ns()?, opt.db()?, &r.tb, &r.id, i, rid); - txn.set(key, vec![]).await?; + txn.set(key, vec![], None).await?; // Store the edges on the record self.current.doc.to_mut().put(&*EDGE, Value::Bool(true)); self.current.doc.to_mut().put(&*IN, l.clone().into()); diff --git a/core/src/doc/store.rs b/core/src/doc/store.rs index 527bf024..04c996f7 100644 --- a/core/src/doc/store.rs +++ b/core/src/doc/store.rs @@ -38,8 +38,10 @@ impl Document { // Record creation worked fine Ok(v) => Ok(v), }, + // INSERT can be versioned + Statement::Insert(_) => txn.set(key, self, opt.version).await, // This is not a CREATE statement, so update the key - _ => txn.set(key, self).await, + _ => txn.set(key, self, None).await, }?; // Carry on Ok(()) diff --git a/core/src/idg/u32.rs b/core/src/idg/u32.rs index 68458264..b9b9c936 100644 --- a/core/src/idg/u32.rs +++ b/core/src/idg/u32.rs @@ -124,7 +124,7 @@ mod tests { async fn finish(txn: Transaction, mut d: U32) -> Result<(), Error> { if let Some((key, val)) = d.finish() { - txn.set(key, val).await?; + txn.set(key, val, None).await?; } txn.commit().await } diff --git a/core/src/idx/docids.rs b/core/src/idx/docids.rs index 8c80a578..3db1083e 100644 --- a/core/src/idx/docids.rs +++ b/core/src/idx/docids.rs @@ -92,7 +92,7 @@ impl DocIds { } } let doc_id = self.get_next_doc_id(); - tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone()).await?; + tx.set(self.index_key_base.new_bi_key(doc_id), doc_key.clone(), None).await?; self.btree.insert(tx, &mut self.store, doc_key, doc_id).await?; Ok(Resolved::New(doc_id)) } @@ -142,7 +142,7 @@ impl DocIds { available_ids: self.available_ids.take(), next_doc_id: self.next_doc_id, }; - tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?; self.ixs.advance_cache_btree_trie(new_cache); } Ok(()) diff --git a/core/src/idx/ft/doclength.rs b/core/src/idx/ft/doclength.rs index ed72199c..38c5a991 100644 --- a/core/src/idx/ft/doclength.rs +++ b/core/src/idx/ft/doclength.rs @@ -87,7 +87,7 @@ impl DocLengths { pub(super) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> { if let Some(new_cache) = self.store.finish(tx).await? { let state = self.btree.inc_generation(); - tx.set(self.state_key.clone(), VersionedStore::try_into(state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(state)?, None).await?; self.ixs.advance_cache_btree_trie(new_cache); } Ok(()) diff --git a/core/src/idx/ft/mod.rs b/core/src/idx/ft/mod.rs index 16875dba..ead6d29c 100644 --- a/core/src/idx/ft/mod.rs +++ b/core/src/idx/ft/mod.rs @@ -334,7 +334,7 @@ impl FtIndex { // Stores the term list for this doc_id let mut val = Vec::new(); terms_ids.serialize_into(&mut val)?; - tx.set(term_ids_key, val).await?; + tx.set(term_ids_key, val, None).await?; // Update the index state self.state.total_docs_lengths += doc_length as u128; @@ -343,7 +343,7 @@ impl FtIndex { } // Update the states - tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?, None).await?; drop(tx); Ok(()) } diff --git a/core/src/idx/ft/offsets.rs b/core/src/idx/ft/offsets.rs index b3166785..3f4cf0e3 100644 --- a/core/src/idx/ft/offsets.rs +++ b/core/src/idx/ft/offsets.rs @@ -26,7 +26,7 @@ impl Offsets { ) -> Result<(), Error> { let key = self.index_key_base.new_bo_key(doc_id, term_id); let val: Val = offsets.try_into()?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; Ok(()) } diff --git a/core/src/idx/ft/postings.rs b/core/src/idx/ft/postings.rs index 02e951b3..aa9ff10a 100644 --- a/core/src/idx/ft/postings.rs +++ b/core/src/idx/ft/postings.rs @@ -87,7 +87,7 @@ impl Postings { pub(super) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> { if let Some(new_cache) = self.store.finish(tx).await? { let state = self.btree.inc_generation(); - tx.set(self.state_key.clone(), VersionedStore::try_into(state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(state)?, None).await?; self.ixs.advance_cache_btree_trie(new_cache); } Ok(()) diff --git a/core/src/idx/ft/termdocs.rs b/core/src/idx/ft/termdocs.rs index 34cc27be..23bb4b81 100644 --- a/core/src/idx/ft/termdocs.rs +++ b/core/src/idx/ft/termdocs.rs @@ -31,7 +31,7 @@ impl TermDocs { let key = self.index_key_base.new_bc_key(term_id); let mut val = Vec::new(); docs.serialize_into(&mut val)?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; } Ok(()) } @@ -65,7 +65,7 @@ impl TermDocs { } else { let mut val = Vec::new(); docs.serialize_into(&mut val)?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; } } Ok(docs.len()) diff --git a/core/src/idx/ft/terms.rs b/core/src/idx/ft/terms.rs index c1f65c69..2bf8964a 100644 --- a/core/src/idx/ft/terms.rs +++ b/core/src/idx/ft/terms.rs @@ -84,7 +84,7 @@ impl Terms { } } let term_id = self.get_next_term_id(); - tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone()).await?; + tx.set(self.index_key_base.new_bu_key(term_id), term_key.clone(), None).await?; self.btree.insert(tx, &mut self.store, term_key, term_id).await?; Ok(term_id) } @@ -129,7 +129,7 @@ impl Terms { available_ids: self.available_ids.take(), next_term_id: self.next_term_id, }; - tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?; self.ixs.advance_store_btree_fst(new_cache); } Ok(()) diff --git a/core/src/idx/trees/hnsw/docs.rs b/core/src/idx/trees/hnsw/docs.rs index 239b118f..601c7987 100644 --- a/core/src/idx/trees/hnsw/docs.rs +++ b/core/src/idx/trees/hnsw/docs.rs @@ -59,9 +59,9 @@ impl HnswDocs { Ok(doc_id) } else { let doc_id = self.next_doc_id(); - tx.set(id_key, doc_id.to_be_bytes()).await?; + tx.set(id_key, doc_id.to_be_bytes(), None).await?; let doc_key = self.ikb.new_hd_key(Some(doc_id)); - tx.set(doc_key, id).await?; + tx.set(doc_key, id, None).await?; Ok(doc_id) } } @@ -112,7 +112,7 @@ impl HnswDocs { pub(in crate::idx) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> { if self.state_updated { - tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(&self.state)?, None).await?; self.state_updated = true; } Ok(()) @@ -183,7 +183,7 @@ impl VecDocs { } } { let val: Val = VersionedStore::try_into(&ed)?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; } Ok(()) } @@ -205,7 +205,7 @@ impl VecDocs { } else { ed.docs = new_docs; let val: Val = VersionedStore::try_into(&ed)?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; } } }; diff --git a/core/src/idx/trees/hnsw/elements.rs b/core/src/idx/trees/hnsw/elements.rs index 95d599b5..f360a88d 100644 --- a/core/src/idx/trees/hnsw/elements.rs +++ b/core/src/idx/trees/hnsw/elements.rs @@ -55,7 +55,7 @@ impl HnswElements { ) -> Result { let key = self.ikb.new_he_key(id); let val = VersionedStore::try_into(ser_vec)?; - tx.set(key, val).await?; + tx.set(key, val, None).await?; let pt: SharedVector = vec.into(); self.elements.insert(id, pt.clone()); Ok(pt) diff --git a/core/src/idx/trees/hnsw/layer.rs b/core/src/idx/trees/hnsw/layer.rs index 55e00c85..35773e32 100644 --- a/core/src/idx/trees/hnsw/layer.rs +++ b/core/src/idx/trees/hnsw/layer.rs @@ -372,7 +372,7 @@ where let old_chunks_len = mem::replace(&mut st.chunks, chunks.len() as u32); for (i, chunk) in chunks.enumerate() { let key = self.ikb.new_hl_key(self.level, i as u32); - tx.set(key, chunk).await?; + tx.set(key, chunk, None).await?; } // Delete larger chunks if they exists for i in st.chunks..old_chunks_len { diff --git a/core/src/idx/trees/hnsw/mod.rs b/core/src/idx/trees/hnsw/mod.rs index f4224155..f852b6c1 100644 --- a/core/src/idx/trees/hnsw/mod.rs +++ b/core/src/idx/trees/hnsw/mod.rs @@ -273,7 +273,7 @@ where async fn save_state(&self, tx: &Transaction) -> Result<(), Error> { let val: Val = VersionedStore::try_into(&self.state)?; - tx.set(self.state_key.clone(), val).await?; + tx.set(self.state_key.clone(), val, None).await?; Ok(()) } diff --git a/core/src/idx/trees/mtree.rs b/core/src/idx/trees/mtree.rs index 892deb0c..962e76e4 100644 --- a/core/src/idx/trees/mtree.rs +++ b/core/src/idx/trees/mtree.rs @@ -175,7 +175,7 @@ impl MTreeIndex { let mut mtree = self.mtree.write().await; if let Some(new_cache) = self.store.finish(tx).await? { mtree.state.generation += 1; - tx.set(self.state_key.clone(), VersionedStore::try_into(&mtree.state)?).await?; + tx.set(self.state_key.clone(), VersionedStore::try_into(&mtree.state)?, None).await?; self.ixs.advance_store_mtree(new_cache); } drop(mtree); diff --git a/core/src/idx/trees/store/mod.rs b/core/src/idx/trees/store/mod.rs index 8bdf0633..be2e2d65 100644 --- a/core/src/idx/trees/store/mod.rs +++ b/core/src/idx/trees/store/mod.rs @@ -161,7 +161,7 @@ impl TreeNodeProvider { { let val = node.n.try_into_val()?; node.size = val.len() as u32; - tx.set(node.key.clone(), val).await?; + tx.set(node.key.clone(), val, None).await?; Ok(()) } } diff --git a/core/src/kvs/api.rs b/core/src/kvs/api.rs index d1d73bd3..1c347aee 100644 --- a/core/src/kvs/api.rs +++ b/core/src/kvs/api.rs @@ -60,7 +60,7 @@ pub trait Transaction { K: Into + Sprintable + Debug; /// Insert or update a key in the datastore. - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug; @@ -307,7 +307,7 @@ pub trait Transaction { // Convert the timestamp to a versionstamp let verbytes = crate::vs::u64_to_versionstamp(ver); // Store the timestamp to prevent other transactions from committing - self.set(key.as_slice(), verbytes.to_vec()).await?; + self.set(key.as_slice(), verbytes.to_vec(), None).await?; // Return the uint64 representation of the timestamp as the result Ok(verbytes) } @@ -338,6 +338,6 @@ pub trait Transaction { let mut k: Vec = prefix.into(); k.append(&mut ts.to_vec()); k.append(&mut suffix.into()); - self.set(k, val).await + self.set(k, val, None).await } } diff --git a/core/src/kvs/fdb/mod.rs b/core/src/kvs/fdb/mod.rs index d1d0dd37..0ce09f40 100644 --- a/core/src/kvs/fdb/mod.rs +++ b/core/src/kvs/fdb/mod.rs @@ -261,11 +261,16 @@ impl super::api::Transaction for Transaction { /// Inserts or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, { + // FDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/indxdb/mod.rs b/core/src/kvs/indxdb/mod.rs index 67411fb0..2da04bdb 100644 --- a/core/src/kvs/indxdb/mod.rs +++ b/core/src/kvs/indxdb/mod.rs @@ -174,11 +174,16 @@ impl super::api::Transaction for Transaction { /// Insert or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, { + // IndexDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/mem/mod.rs b/core/src/kvs/mem/mod.rs index 2ed8a72d..e2172a40 100644 --- a/core/src/kvs/mem/mod.rs +++ b/core/src/kvs/mem/mod.rs @@ -171,11 +171,16 @@ impl super::api::Transaction for Transaction { /// Insert or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, { + // MemDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/node.rs b/core/src/kvs/node.rs index baba653e..6c13f678 100644 --- a/core/src/kvs/node.rs +++ b/core/src/kvs/node.rs @@ -51,7 +51,7 @@ impl Datastore { let key = crate::key::root::nd::new(id); let now = self.clock_now().await; let val = Node::new(id, now, false); - run!(txn, txn.set(key, val)) + run!(txn, txn.set(key, val, None)) } /// Deletes a node from the cluster. @@ -70,7 +70,7 @@ impl Datastore { let key = crate::key::root::nd::new(id); let val = txn.get_node(id).await?; let val = val.as_ref().archive(); - run!(txn, txn.set(key, val)) + run!(txn, txn.set(key, val, None)) } /// Expires nodes which have timedout from the cluster. @@ -100,7 +100,7 @@ impl Datastore { // Get the key for the node entry let key = crate::key::root::nd::new(nd.id); // Update the node entry - catch!(txn, txn.set(key, val)); + catch!(txn, txn.set(key, val, None)); } } } diff --git a/core/src/kvs/rocksdb/mod.rs b/core/src/kvs/rocksdb/mod.rs index 8becb7bd..4637716e 100644 --- a/core/src/kvs/rocksdb/mod.rs +++ b/core/src/kvs/rocksdb/mod.rs @@ -255,11 +255,16 @@ impl super::api::Transaction for Transaction { /// Insert or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, { + // RocksDB does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); diff --git a/core/src/kvs/surrealkv/mod.rs b/core/src/kvs/surrealkv/mod.rs index a9979ac2..75bb9483 100644 --- a/core/src/kvs/surrealkv/mod.rs +++ b/core/src/kvs/surrealkv/mod.rs @@ -180,7 +180,7 @@ impl super::api::Transaction for Transaction { /// Insert or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, @@ -194,7 +194,10 @@ impl super::api::Transaction for Transaction { return Err(Error::TxReadonly); } // Set the key - self.inner.set(&key.into(), &val.into())?; + match version { + Some(ts) => self.inner.set_at_ts(&key.into(), &val.into(), ts)?, + None => self.inner.set(&key.into(), &val.into())?, + } // Return result Ok(()) } diff --git a/core/src/kvs/tests/multireader.rs b/core/src/kvs/tests/multireader.rs index c6d7f703..d2a2ba86 100644 --- a/core/src/kvs/tests/multireader.rs +++ b/core/src/kvs/tests/multireader.rs @@ -7,7 +7,7 @@ async fn multireader() { let (ds, _) = new_ds(node_id, clock).await; // Insert an initial key let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "some text").await.unwrap(); + tx.set("test", "some text", None).await.unwrap(); tx.commit().await.unwrap(); // Create a readonly transaction let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner(); diff --git a/core/src/kvs/tests/multiwriter_different_keys.rs b/core/src/kvs/tests/multiwriter_different_keys.rs index c0eeb5ad..d3993ec4 100644 --- a/core/src/kvs/tests/multiwriter_different_keys.rs +++ b/core/src/kvs/tests/multiwriter_different_keys.rs @@ -7,17 +7,17 @@ async fn multiwriter_different_keys() { let (ds, _) = new_ds(node_id, clock).await; // Insert an initial key let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "some text").await.unwrap(); + tx.set("test", "some text", None).await.unwrap(); tx.commit().await.unwrap(); // Create a writeable transaction let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx1.set("test1", "other text 1").await.unwrap(); + tx1.set("test1", "other text 1", None).await.unwrap(); // Create a writeable transaction let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx2.set("test2", "other text 2").await.unwrap(); + tx2.set("test2", "other text 2", None).await.unwrap(); // Create a writeable transaction let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx3.set("test3", "other text 3").await.unwrap(); + tx3.set("test3", "other text 3", None).await.unwrap(); // Cancel both writeable transactions tx1.commit().await.unwrap(); tx2.commit().await.unwrap(); diff --git a/core/src/kvs/tests/multiwriter_same_keys_allow.rs b/core/src/kvs/tests/multiwriter_same_keys_allow.rs index 7ace0429..d3b0545a 100644 --- a/core/src/kvs/tests/multiwriter_same_keys_allow.rs +++ b/core/src/kvs/tests/multiwriter_same_keys_allow.rs @@ -7,17 +7,17 @@ async fn multiwriter_same_keys_allow() { let (ds, _) = new_ds(node_id, clock).await; // Insert an initial key let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "some text").await.unwrap(); + tx.set("test", "some text", None).await.unwrap(); tx.commit().await.unwrap(); // Create a writeable transaction let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx1.set("test", "other text 1").await.unwrap(); + tx1.set("test", "other text 1", None).await.unwrap(); // Create a writeable transaction let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx2.set("test", "other text 2").await.unwrap(); + tx2.set("test", "other text 2", None).await.unwrap(); // Create a writeable transaction let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx3.set("test", "other text 3").await.unwrap(); + tx3.set("test", "other text 3", None).await.unwrap(); // Cancel both writeable transactions assert!(tx1.commit().await.is_ok()); assert!(tx2.commit().await.is_ok()); @@ -29,7 +29,7 @@ async fn multiwriter_same_keys_allow() { tx.cancel().await.unwrap(); // Create a writeable transaction let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "original text").await.unwrap(); + tx.set("test", "original text", None).await.unwrap(); tx.commit().await.unwrap(); // Check that the key was updated ok let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); diff --git a/core/src/kvs/tests/multiwriter_same_keys_conflict.rs b/core/src/kvs/tests/multiwriter_same_keys_conflict.rs index 46744948..3d2aeeef 100644 --- a/core/src/kvs/tests/multiwriter_same_keys_conflict.rs +++ b/core/src/kvs/tests/multiwriter_same_keys_conflict.rs @@ -7,17 +7,17 @@ async fn multiwriter_same_keys_conflict() { let (ds, _) = new_ds(node_id, clock).await; // Insert an initial key let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "some text").await.unwrap(); + tx.set("test", "some text", None).await.unwrap(); tx.commit().await.unwrap(); // Create a writeable transaction let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx1.set("test", "other text 1").await.unwrap(); + tx1.set("test", "other text 1", None).await.unwrap(); // Create a writeable transaction let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx2.set("test", "other text 2").await.unwrap(); + tx2.set("test", "other text 2", None).await.unwrap(); // Create a writeable transaction let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx3.set("test", "other text 3").await.unwrap(); + tx3.set("test", "other text 3", None).await.unwrap(); // Cancel both writeable transactions assert!(tx1.commit().await.is_ok()); assert!(tx2.commit().await.is_err()); @@ -29,7 +29,7 @@ async fn multiwriter_same_keys_conflict() { tx.cancel().await.unwrap(); // Create a writeable transaction let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "original text").await.unwrap(); + tx.set("test", "original text", None).await.unwrap(); tx.commit().await.unwrap(); // Check that the key was updated ok let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); diff --git a/core/src/kvs/tests/raw.rs b/core/src/kvs/tests/raw.rs index e846d173..92613118 100644 --- a/core/src/kvs/tests/raw.rs +++ b/core/src/kvs/tests/raw.rs @@ -55,7 +55,7 @@ async fn set() { let (ds, _) = new_ds(node_id, clock).await; // Create a writeable transaction let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - assert!(tx.set("test", "one").await.is_ok()); + assert!(tx.set("test", "one", None).await.is_ok()); tx.commit().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); @@ -64,7 +64,7 @@ async fn set() { tx.cancel().await.unwrap(); // Create a writeable transaction let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - assert!(tx.set("test", "two").await.is_ok()); + assert!(tx.set("test", "two", None).await.is_ok()); tx.commit().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); diff --git a/core/src/kvs/tests/snapshot.rs b/core/src/kvs/tests/snapshot.rs index 1dd256ca..4c8e22bd 100644 --- a/core/src/kvs/tests/snapshot.rs +++ b/core/src/kvs/tests/snapshot.rs @@ -7,7 +7,7 @@ async fn snapshot() { let (ds, _) = new_ds(node_id, clock).await; // Insert an initial key let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner(); - tx.set("test", "some text").await.unwrap(); + tx.set("test", "some text", None).await.unwrap(); tx.commit().await.unwrap(); // Create a readonly transaction let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner(); @@ -17,7 +17,7 @@ async fn snapshot() { // Create a new writeable transaction let mut txw = ds.transaction(Write, Optimistic).await.unwrap().inner(); // Update the test key content - txw.set("test", "other text").await.unwrap(); + txw.set("test", "other text", None).await.unwrap(); // Create a readonly transaction let mut tx2 = ds.transaction(Read, Optimistic).await.unwrap().inner(); let val = tx2.get("test", None).await.unwrap().unwrap(); @@ -27,7 +27,7 @@ async fn snapshot() { let val = tx3.get("test", None).await.unwrap().unwrap(); assert_eq!(val, b"some text"); // Update the test key content - txw.set("test", "extra text").await.unwrap(); + txw.set("test", "extra text", None).await.unwrap(); // Check the key from the original transaction let val = tx1.get("test", None).await.unwrap().unwrap(); assert_eq!(val, b"some text"); diff --git a/core/src/kvs/tikv/mod.rs b/core/src/kvs/tikv/mod.rs index 01b44ab6..a4dbd5a7 100644 --- a/core/src/kvs/tikv/mod.rs +++ b/core/src/kvs/tikv/mod.rs @@ -205,11 +205,16 @@ impl super::api::Transaction for Transaction { /// Insert or update a key in the database #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))] - async fn set(&mut self, key: K, val: V) -> Result<(), Error> + async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Sprintable + Debug, V: Into + Debug, { + // TiKV does not support verisoned queries. + if version.is_some() { + return Err(Error::UnsupportedVersionedQueries); + } + // Check to see if transaction is closed if self.done { return Err(Error::TxFinished); @@ -437,7 +442,7 @@ impl super::api::Transaction for Transaction { // Convert the timestamp to a versionstamp let verbytes = crate::vs::u64_to_versionstamp(ver); // Store the timestamp to prevent other transactions from committing - self.set(key.as_slice(), verbytes.to_vec()).await?; + self.set(key.as_slice(), verbytes.to_vec(), None).await?; // Return the uint64 representation of the timestamp as the result Ok(verbytes) } diff --git a/core/src/kvs/tr.rs b/core/src/kvs/tr.rs index b8e6d941..d9a1287d 100644 --- a/core/src/kvs/tr.rs +++ b/core/src/kvs/tr.rs @@ -247,13 +247,13 @@ impl Transactor { /// Insert or update a key in the datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] - pub async fn set(&mut self, key: K, val: V) -> Result<(), Error> + pub async fn set(&mut self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Debug, V: Into + Debug, { let key = key.into(); - expand_inner!(&mut self.inner, v => { v.set(key, val).await }) + expand_inner!(&mut self.inner, v => { v.set(key, val, version).await }) } /// Insert a key if it doesn't exist in the datastore. @@ -468,7 +468,7 @@ impl Transactor { let nid = seq.get_next_id(); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(nid) } @@ -479,7 +479,7 @@ impl Transactor { let nid = seq.get_next_id(); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(nid) } @@ -490,7 +490,7 @@ impl Transactor { let nid = seq.get_next_id(); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(nid) } @@ -502,7 +502,7 @@ impl Transactor { seq.remove_id(ns); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(()) } @@ -514,7 +514,7 @@ impl Transactor { seq.remove_id(db); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(()) } @@ -526,7 +526,7 @@ impl Transactor { seq.remove_id(tb); self.stash.set(key, seq.clone()); let (k, v) = seq.finish().unwrap(); - self.set(k, v).await?; + self.set(k, v, None).await?; Ok(()) } @@ -599,7 +599,7 @@ impl Transactor { )); } } - self.set(ts_key, vst).await?; + self.set(ts_key, vst, None).await?; Ok(vst) } diff --git a/core/src/kvs/tx.rs b/core/src/kvs/tx.rs index 7248ee34..2fb61b1b 100644 --- a/core/src/kvs/tx.rs +++ b/core/src/kvs/tx.rs @@ -191,12 +191,12 @@ impl Transaction { /// Insert or update a key in the datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] - pub async fn set(&self, key: K, val: V) -> Result<(), Error> + pub async fn set(&self, key: K, val: V, version: Option) -> Result<(), Error> where K: Into + Debug, V: Into + Debug, { - self.lock().await.set(key, val).await + self.lock().await.set(key, val, version).await } /// Insert a key if it doesn't exist in the datastore. @@ -1310,7 +1310,7 @@ impl Transaction { let key = crate::key::thing::new(ns, db, tb, id); let enc = crate::key::thing::new(ns, db, tb, id).encode()?; // Set the value in the datastore - self.set(&key, &val).await?; + self.set(&key, &val, None).await?; // Set the value in the cache self.cache.insert(enc, Entry::Val(Arc::new(val))); // Return nothing diff --git a/core/src/sql/statements/access.rs b/core/src/sql/statements/access.rs index 573486f9..823e1d85 100644 --- a/core/src/sql/statements/access.rs +++ b/core/src/sql/statements/access.rs @@ -293,7 +293,7 @@ async fn compute_grant( let gr_str = gr.id.to_raw(); // Process the statement let key = crate::key::root::access::gr::new(&ac_str, &gr_str); - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.into())) } _ => Err(Error::AccessMethodMismatch), @@ -350,7 +350,7 @@ async fn compute_grant( // Process the statement let key = crate::key::namespace::access::gr::new(opt.ns()?, &ac_str, &gr_str); txn.get_or_add_ns(opt.ns()?, opt.strict).await?; - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.into())) } _ => Err(Error::AccessMethodMismatch), @@ -418,7 +418,7 @@ async fn compute_grant( ); txn.get_or_add_ns(opt.ns()?, opt.strict).await?; txn.get_or_add_db(opt.ns()?, opt.db()?, opt.strict).await?; - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.into())) } } @@ -523,7 +523,7 @@ async fn compute_revoke( gr.revocation = Some(Datetime::default()); // Process the statement let key = crate::key::root::access::gr::new(&ac_str, &gr_str); - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.redacted().into())) } Base::Ns => { @@ -544,7 +544,7 @@ async fn compute_revoke( // Process the statement let key = crate::key::namespace::access::gr::new(opt.ns()?, &ac_str, &gr_str); txn.get_or_add_ns(opt.ns()?, opt.strict).await?; - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.redacted().into())) } Base::Db => { @@ -567,7 +567,7 @@ async fn compute_revoke( let key = crate::key::database::access::gr::new(opt.ns()?, opt.db()?, &ac_str, &gr_str); txn.get_or_add_ns(opt.ns()?, opt.strict).await?; txn.get_or_add_db(opt.ns()?, opt.db()?, opt.strict).await?; - txn.set(key, &gr).await?; + txn.set(key, &gr, None).await?; Ok(Value::Object(gr.redacted().into())) } _ => Err(Error::Unimplemented( diff --git a/core/src/sql/statements/alter/table.rs b/core/src/sql/statements/alter/table.rs index ed8825ad..f722cf95 100644 --- a/core/src/sql/statements/alter/table.rs +++ b/core/src/sql/statements/alter/table.rs @@ -69,7 +69,7 @@ impl AlterTableStatement { dt.kind = kind.clone(); } - txn.set(key, &dt).await?; + txn.set(key, &dt, None).await?; // Add table relational fields if matches!(self.kind, Some(TableType::Relation(_))) { dt.add_in_out_fields(&txn, opt).await?; diff --git a/core/src/sql/statements/define/access.rs b/core/src/sql/statements/define/access.rs index d7951a35..366d9b30 100644 --- a/core/src/sql/statements/define/access.rs +++ b/core/src/sql/statements/define/access.rs @@ -92,6 +92,7 @@ impl DefineAccessStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache @@ -124,6 +125,7 @@ impl DefineAccessStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache @@ -158,6 +160,7 @@ impl DefineAccessStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/analyzer.rs b/core/src/sql/statements/define/analyzer.rs index 01222bcb..2b3e0f2f 100644 --- a/core/src/sql/statements/define/analyzer.rs +++ b/core/src/sql/statements/define/analyzer.rs @@ -60,6 +60,7 @@ impl DefineAnalyzerStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/database.rs b/core/src/sql/statements/define/database.rs index 7354f330..2f10745b 100644 --- a/core/src/sql/statements/define/database.rs +++ b/core/src/sql/statements/define/database.rs @@ -63,6 +63,7 @@ impl DefineDatabaseStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/event.rs b/core/src/sql/statements/define/event.rs index d2d09e78..7386b08a 100644 --- a/core/src/sql/statements/define/event.rs +++ b/core/src/sql/statements/define/event.rs @@ -61,6 +61,7 @@ impl DefineEventStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/field.rs b/core/src/sql/statements/define/field.rs index 70958231..4cb5c06a 100644 --- a/core/src/sql/statements/define/field.rs +++ b/core/src/sql/statements/define/field.rs @@ -76,6 +76,7 @@ impl DefineFieldStatement { overwrite: false, ..self.clone() }, + None, ) .await?; @@ -113,7 +114,7 @@ impl DefineFieldStatement { } }; - txn.set(key, statement).await?; + txn.set(key, statement, None).await?; if let Some(new_kind) = new_kind { cur_kind = new_kind; @@ -146,7 +147,7 @@ impl DefineFieldStatement { }), ..tb.as_ref().to_owned() }; - txn.set(key, val).await?; + txn.set(key, val, None).await?; } } } @@ -175,7 +176,7 @@ impl DefineFieldStatement { }), ..tb.as_ref().to_owned() }; - txn.set(key, val).await?; + txn.set(key, val, None).await?; } } } diff --git a/core/src/sql/statements/define/function.rs b/core/src/sql/statements/define/function.rs index de1c0186..0b2e6b09 100644 --- a/core/src/sql/statements/define/function.rs +++ b/core/src/sql/statements/define/function.rs @@ -63,6 +63,7 @@ impl DefineFunctionStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/index.rs b/core/src/sql/statements/define/index.rs index 96a474d1..d8ac49aa 100644 --- a/core/src/sql/statements/define/index.rs +++ b/core/src/sql/statements/define/index.rs @@ -87,6 +87,7 @@ impl DefineIndexStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/model.rs b/core/src/sql/statements/define/model.rs index 89190014..2a4d63dc 100644 --- a/core/src/sql/statements/define/model.rs +++ b/core/src/sql/statements/define/model.rs @@ -61,6 +61,7 @@ impl DefineModelStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/namespace.rs b/core/src/sql/statements/define/namespace.rs index 45a85b81..287d58c2 100644 --- a/core/src/sql/statements/define/namespace.rs +++ b/core/src/sql/statements/define/namespace.rs @@ -61,6 +61,7 @@ impl DefineNamespaceStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/param.rs b/core/src/sql/statements/define/param.rs index 014e5e3b..d6cc4f69 100644 --- a/core/src/sql/statements/define/param.rs +++ b/core/src/sql/statements/define/param.rs @@ -64,6 +64,7 @@ impl DefineParamStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/define/table.rs b/core/src/sql/statements/define/table.rs index eef7b07e..51c6db63 100644 --- a/core/src/sql/statements/define/table.rs +++ b/core/src/sql/statements/define/table.rs @@ -78,7 +78,7 @@ impl DefineTableStatement { overwrite: false, ..self.clone() }; - txn.set(key, &dt).await?; + txn.set(key, &dt, None).await?; // Add table relational fields self.add_in_out_fields(&txn, opt).await?; // Clear the cache @@ -96,7 +96,7 @@ impl DefineTableStatement { for v in view.what.0.iter() { // Save the view config let key = crate::key::table::ft::new(opt.ns()?, opt.db()?, v, &self.name); - txn.set(key, self).await?; + txn.set(key, self, None).await?; } // Force queries to run let opt = &opt.new_with_force(Force::Table(Arc::new([dt]))); @@ -147,6 +147,7 @@ impl DefineTableStatement { kind: Some(val), ..Default::default() }, + None, ) .await?; } @@ -162,6 +163,7 @@ impl DefineTableStatement { kind: Some(val), ..Default::default() }, + None, ) .await?; } diff --git a/core/src/sql/statements/define/user.rs b/core/src/sql/statements/define/user.rs index 773a85bc..f4336759 100644 --- a/core/src/sql/statements/define/user.rs +++ b/core/src/sql/statements/define/user.rs @@ -133,6 +133,7 @@ impl DefineUserStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache @@ -165,6 +166,7 @@ impl DefineUserStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache @@ -199,6 +201,7 @@ impl DefineUserStatement { overwrite: false, ..self.clone() }, + None, ) .await?; // Clear the cache diff --git a/core/src/sql/statements/insert.rs b/core/src/sql/statements/insert.rs index 580532a9..b559af4d 100644 --- a/core/src/sql/statements/insert.rs +++ b/core/src/sql/statements/insert.rs @@ -4,14 +4,14 @@ use crate::doc::CursorDoc; use crate::err::Error; use crate::sql::paths::IN; use crate::sql::paths::OUT; -use crate::sql::{Data, Id, Output, Table, Thing, Timeout, Value}; +use crate::sql::{Data, Id, Output, Table, Thing, Timeout, Value, Version}; use derive::Store; use reblessive::tree::Stk; use revision::revisioned; use serde::{Deserialize, Serialize}; use std::fmt; -#[revisioned(revision = 2)] +#[revisioned(revision = 3)] #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[non_exhaustive] @@ -25,6 +25,8 @@ pub struct InsertStatement { pub parallel: bool, #[revision(start = 2)] pub relation: bool, + #[revision(start = 3)] + pub version: Option, } impl InsertStatement { @@ -44,8 +46,10 @@ impl InsertStatement { opt.valid_for_db()?; // Create a new iterator let mut i = Iterator::new(); + // Propagate the version to the underlying datastore + let version = self.version.as_ref().map(|v| v.to_u64()); // Ensure futures are stored - let opt = &opt.new_with_futures(false).with_projections(false); + let opt = &opt.new_with_futures(false).with_projections(false).with_version(version); // Parse the INTO expression let into = match &self.into { None => None, @@ -129,6 +133,9 @@ impl fmt::Display for InsertStatement { if let Some(ref v) = self.output { write!(f, " {v}")? } + if let Some(ref v) = self.version { + write!(f, " {v}")? + } if let Some(ref v) = self.timeout { write!(f, " {v}")? } diff --git a/core/src/syn/parser/stmt/insert.rs b/core/src/syn/parser/stmt/insert.rs index 721077a5..908193d5 100644 --- a/core/src/syn/parser/stmt/insert.rs +++ b/core/src/syn/parser/stmt/insert.rs @@ -75,6 +75,7 @@ impl Parser<'_> { None }; let output = self.try_parse_output(ctx).await?; + let version = self.try_parse_version()?; let timeout = self.try_parse_timeout()?; let parallel = self.eat(t!("PARALLEL")); Ok(InsertStatement { @@ -86,6 +87,7 @@ impl Parser<'_> { timeout, parallel, relation, + version, }) } diff --git a/core/src/syn/parser/test/stmt.rs b/core/src/syn/parser/test/stmt.rs index 916d4eb0..4f1711aa 100644 --- a/core/src/syn/parser/test/stmt.rs +++ b/core/src/syn/parser/test/stmt.rs @@ -2084,6 +2084,7 @@ fn parse_insert() { ), ])), output: Some(Output::After), + version: None, timeout: None, parallel: false, relation: false, diff --git a/core/src/syn/parser/test/streaming.rs b/core/src/syn/parser/test/streaming.rs index 0d031c94..4a11d0ab 100644 --- a/core/src/syn/parser/test/streaming.rs +++ b/core/src/syn/parser/test/streaming.rs @@ -608,6 +608,7 @@ fn statements() -> Vec { ), ])), output: Some(Output::After), + version: None, timeout: None, parallel: false, relation: false, diff --git a/sdk/tests/api.rs b/sdk/tests/api.rs index bc010ab6..13a0234f 100644 --- a/sdk/tests/api.rs +++ b/sdk/tests/api.rs @@ -537,6 +537,50 @@ mod api_integration { assert!(response.is_none()); } + #[test_log::test(tokio::test)] + async fn insert_with_version() { + let (permit, db) = new_db().await; + db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap(); + drop(permit); + + // Create a record in the past. + let _ = db + .query("INSERT INTO user { id: user:john, name: 'John' } VERSION d'2024-08-19T08:00:00Z'") + .await + .unwrap() + .check() + .unwrap(); + + // Without VERSION, SELECT should return the record. + let mut response = db.query("SELECT * FROM user:john").await.unwrap().check().unwrap(); + let Some(name): Option = response.take("name").unwrap() else { + panic!("query returned no record"); + }; + assert_eq!(name, "John"); + + // SELECT with the VERSION set to the creation timestamp or later should return the record. + let mut response = db + .query("SELECT * FROM user:john VERSION d'2024-08-19T08:00:00Z'") + .await + .unwrap() + .check() + .unwrap(); + let Some(name): Option = response.take("name").unwrap() else { + panic!("query returned no record"); + }; + assert_eq!(name, "John"); + + // SELECT with the VERSION set before the creation timestamp should return nothing. + let mut response = db + .query("SELECT * FROM user:john VERSION d'2024-08-19T07:00:00Z'") + .await + .unwrap() + .check() + .unwrap(); + let response: Option = response.take("name").unwrap(); + assert!(response.is_none()); + } + include!("api/mod.rs"); include!("api/live.rs"); include!("api/backup.rs");