Select from individual record with version (#4510)

This commit is contained in:
Sergii Glushchenko 2024-08-15 07:29:53 +02:00 committed by GitHub
parent 212d5a9e5a
commit 6502b5ed8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 154 additions and 100 deletions

4
Cargo.lock generated
View file

@ -6046,9 +6046,9 @@ dependencies = [
[[package]]
name = "surrealkv"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079b1114b1bbe22e2738f0c6fccc0109d06be9be7f48d2e38d143baf935358e9"
checksum = "1ab634895ff2c9289cb854a56335e428338ba71074c9eff02d84b360f0210155"
dependencies = [
"ahash 0.8.11",
"async-channel 2.2.0",

View file

@ -143,7 +143,7 @@ sha2 = "0.10.8"
snap = "1.1.0"
storekey = "0.5.0"
subtle = "2.6"
surrealkv = { version = "0.3.2", optional = true }
surrealkv = { version = "0.3.3", optional = true }
surrealml = { version = "0.1.1", optional = true, package = "surrealml-core" }
tempfile = { version = "3.10.1", optional = true }
thiserror = "1.0.50"

View file

@ -211,7 +211,7 @@ impl<'a> Processor<'a> {
ctx.tx().check_ns_db_tb(opt.ns()?, opt.db()?, &v.tb, opt.strict).await?;
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key).await?;
let val = ctx.tx().get(key, opt.version).await?;
// Parse the data from the store
let val = Operable::Value(match val {
Some(v) => Value::from(v),
@ -240,7 +240,7 @@ impl<'a> Processor<'a> {
ctx.tx().check_ns_db_tb(opt.ns()?, opt.db()?, &v.tb, opt.strict).await?;
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key).await?;
let val = ctx.tx().get(key, None).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
@ -271,7 +271,7 @@ impl<'a> Processor<'a> {
ctx.tx().check_ns_db_tb(opt.ns()?, opt.db()?, &v.tb, opt.strict).await?;
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key).await?;
let val = ctx.tx().get(key, None).await?;
// Parse the data from the store
let x = match val {
Some(v) => Value::from(v),
@ -487,7 +487,7 @@ impl<'a> Processor<'a> {
let gra: graph::Graph = graph::Graph::decode(&key)?;
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, gra.ft, &gra.fk);
let val = txn.get(key).await?;
let val = txn.get(key, None).await?;
let rid = Thing::from((gra.ft, gra.fk));
// Parse the data from the store
let val = Operable::Value(match val {
@ -587,7 +587,7 @@ impl Iterable {
// Fetch the data from the store
let key = thing::new(opt.ns()?, opt.db()?, &thg.tb, &thg.id);
// Fetch and parse the data from the store
let val = txn.get(key).await?.map(Value::from).unwrap_or(Value::None);
let val = txn.get(key, None).await?.map(Value::from).unwrap_or(Value::None);
// Return the result
Ok(val)
}

View file

@ -52,7 +52,7 @@ impl<'a> Document<'a> {
Err(Error::RetryWithId(v)) => {
// Fetch the data from the store
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key).await?;
let val = ctx.tx().get(key, None).await?;
// Parse the data from the store
let val = match val {
Some(v) => Value::from(v),

View file

@ -304,7 +304,7 @@ impl<'a> IndexOperation<'a> {
let key = self.get_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_unique_index_key(&n)?;
let val = txn.get(key).await?.unwrap();
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}
@ -338,7 +338,7 @@ impl<'a> IndexOperation<'a> {
let key = self.get_non_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_non_unique_index_key(&n)?;
let val = txn.get(key).await?.unwrap();
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}

View file

@ -46,7 +46,7 @@ impl<'a> Document<'a> {
Err(Error::RetryWithId(v)) => {
// Fetch the data from the store
let key = crate::key::thing::new(opt.ns()?, opt.db()?, &v.tb, &v.id);
let val = ctx.tx().get(key).await?;
let val = ctx.tx().get(key, None).await?;
// Parse the data from the store
let val = match val {
Some(v) => Value::from(v),

View file

@ -117,7 +117,7 @@ mod tests {
async fn get_ids(ds: &Datastore) -> (Transaction, U32) {
let txn = ds.transaction(Write, Optimistic).await.unwrap();
let key = "foo";
let v = txn.get(key).await.unwrap();
let v = txn.get(key, None).await.unwrap();
let d = U32::new(key.into(), v).await.unwrap();
(txn, d)
}

View file

@ -30,7 +30,7 @@ impl DocIds {
cache_size: u32,
) -> Result<Self, Error> {
let state_key: Key = ikb.new_bd_key(None);
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
let state: State = if let Some(val) = tx.get(state_key.clone(), None).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
@ -123,7 +123,7 @@ impl DocIds {
doc_id: DocId,
) -> Result<Option<Key>, Error> {
let doc_id_key = self.index_key_base.new_bi_key(doc_id);
if let Some(val) = tx.get(doc_id_key).await? {
if let Some(val) = tx.get(doc_id_key, None).await? {
Ok(Some(val))
} else {
Ok(None)

View file

@ -25,7 +25,7 @@ impl DocLengths {
cache_size: u32,
) -> Result<Self, Error> {
let state_key: Key = ikb.new_bl_key(None);
let state: BState = if let Some(val) = tx.get(state_key.clone()).await? {
let state: BState = if let Some(val) = tx.get(state_key.clone(), None).await? {
BState::try_from_val(val)?
} else {
BState::new(default_btree_order)

View file

@ -118,7 +118,7 @@ impl FtIndex {
tt: TransactionType,
) -> Result<Self, Error> {
let state_key: Key = index_key_base.new_bs_key();
let state: State = if let Some(val) = txn.get(state_key.clone()).await? {
let state: State = if let Some(val) = txn.get(state_key.clone(), None).await? {
State::try_from_val(val)?
} else {
State::default()
@ -208,7 +208,9 @@ impl FtIndex {
}
// Get the term list
if let Some(term_list_vec) = tx.get(self.index_key_base.new_bk_key(doc_id)).await? {
if let Some(term_list_vec) =
tx.get(self.index_key_base.new_bk_key(doc_id), None).await?
{
let term_list = RoaringTreemap::deserialize_from(&mut term_list_vec.as_slice())?;
// Remove the postings
let mut p = self.postings.write().await;
@ -280,7 +282,7 @@ impl FtIndex {
// Retrieve the existing terms for this document (if any)
let term_ids_key = self.index_key_base.new_bk_key(doc_id);
let mut old_term_ids = if let Some(val) = tx.get(term_ids_key.clone()).await? {
let mut old_term_ids = if let Some(val) = tx.get(term_ids_key.clone(), None).await? {
Some(RoaringTreemap::deserialize_from(&mut val.as_slice())?)
} else {
None

View file

@ -37,7 +37,7 @@ impl Offsets {
term_id: TermId,
) -> Result<Option<OffsetRecords>, Error> {
let key = self.index_key_base.new_bo_key(doc_id, term_id);
if let Some(val) = tx.get(key).await? {
if let Some(val) = tx.get(key, None).await? {
let offsets = val.try_into()?;
Ok(Some(offsets))
} else {

View file

@ -27,7 +27,7 @@ impl Postings {
cache_size: u32,
) -> Result<Self, Error> {
let state_key: Key = index_key_base.new_bp_key(None);
let state: BState = if let Some(val) = tx.get(state_key.clone()).await? {
let state: BState = if let Some(val) = tx.get(state_key.clone(), None).await? {
BState::try_from_val(val)?
} else {
BState::new(order)

View file

@ -42,7 +42,7 @@ impl TermDocs {
term_id: TermId,
) -> Result<Option<RoaringTreemap>, Error> {
let key = self.index_key_base.new_bc_key(term_id);
if let Some(val) = tx.get(key).await? {
if let Some(val) = tx.get(key, None).await? {
let docs = RoaringTreemap::deserialize_from(&mut val.as_slice())?;
Ok(Some(docs))
} else {

View file

@ -31,7 +31,7 @@ impl Terms {
cache_size: u32,
) -> Result<Self, Error> {
let state_key: Key = index_key_base.new_bt_key(None);
let state: State = if let Some(val) = tx.get(state_key.clone()).await? {
let state: State = if let Some(val) = tx.get(state_key.clone(), None).await? {
State::try_from_val(val)?
} else {
State::new(default_btree_order)
@ -103,7 +103,7 @@ impl Terms {
term_id: TermId,
) -> Result<(), Error> {
let term_id_key = self.index_key_base.new_bu_key(term_id);
if let Some(term_key) = tx.get(term_id_key.clone()).await? {
if let Some(term_key) = tx.get(term_id_key.clone(), None).await? {
self.btree.delete(tx, &mut self.store, term_key.clone()).await?;
tx.del(term_id_key).await?;
if let Some(available_ids) = &mut self.available_ids {

View file

@ -525,7 +525,7 @@ impl UniqueEqualThingIterator {
async fn next_batch<B: IteratorBatch>(&mut self, tx: &Transaction) -> Result<B, Error> {
if let Some(key) = self.key.take() {
if let Some(val) = tx.get(key).await? {
if let Some(val) = tx.get(key, None).await? {
let record = (val.into(), self.irf.into(), None);
return Ok(B::from_one(record));
}
@ -614,7 +614,7 @@ impl UniqueRangeThingIterator {
}
let end = self.r.end.clone();
if self.r.matches(&end) {
if let Some(v) = tx.get(end).await? {
if let Some(v) = tx.get(end, None).await? {
records.add((v.into(), self.irf.into(), None));
}
}
@ -662,7 +662,7 @@ impl UniqueUnionThingIterator {
if ctx.is_done() {
break;
}
if let Some(val) = tx.get(key).await? {
if let Some(val) = tx.get(key, None).await? {
results.add((val.into(), self.irf.into(), None));
if results.len() >= limit {
break;

View file

@ -57,7 +57,7 @@ impl MTreeIndex {
DocIds::new(ixs, txn, tt, ikb.clone(), p.doc_ids_order, p.doc_ids_cache).await?,
));
let state_key = ikb.new_vm_key(None);
let state: MState = if let Some(val) = txn.get(state_key.clone()).await? {
let state: MState = if let Some(val) = txn.get(state_key.clone(), None).await? {
MState::try_from_val(val)?
} else {
MState::new(p.capacity)

View file

@ -146,7 +146,7 @@ impl TreeNodeProvider {
N: TreeNode + Clone,
{
let key = self.get_key(id);
if let Some(val) = tx.get(key.clone()).await? {
if let Some(val) = tx.get(key.clone(), None).await? {
let size = val.len() as u32;
let node = N::try_from_val(val)?;
Ok(StoredNode::new(node, id, key, size))

View file

@ -54,7 +54,7 @@ pub trait Transaction {
K: Into<Key> + Sprintable + Debug;
/// Fetch a key from the datastore.
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug;
@ -121,7 +121,7 @@ pub trait Transaction {
// Continue with function logic
let mut out = Vec::with_capacity(keys.len());
for key in keys.into_iter() {
if let Some(val) = self.get(key).await? {
if let Some(val) = self.get(key, None).await? {
out.push(val);
} else {
out.push(vec![]);
@ -293,7 +293,7 @@ pub trait Transaction {
// Calculate the version key
let key = key.into();
// Calculate the version number
let ver = match self.get(key.as_slice()).await? {
let ver = match self.get(key.as_slice(), None).await? {
Some(prev) => {
let res: Result<[u8; 10], Error> = match prev.as_slice().try_into() {
Ok(ba) => Ok(ba),

View file

@ -234,10 +234,15 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// FDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -153,10 +153,15 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// IndexDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -150,10 +150,15 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// MemDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -280,7 +280,7 @@ impl Datastore {
// Get the key for this node live query
let nlq = crate::key::node::lq::new(self.id(), id);
// Fetch the LIVE meta data node entry
if let Some(val) = catch!(txn, txn.get(nlq)) {
if let Some(val) = catch!(txn, txn.get(nlq, None)) {
// Decode the data for this live query
let lq: Live = val.into();
// Get the key for this node live query

View file

@ -234,10 +234,15 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// RocksDB does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);

View file

@ -159,7 +159,7 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
@ -167,8 +167,13 @@ impl super::api::Transaction for Transaction {
if self.done {
return Err(Error::TxFinished);
}
// Fetch the value from the database.
let res = self.inner.get(&key.into())?;
let res = match version {
Some(ts) => Some(self.inner.get_at_ts(&key.into(), ts)?),
None => self.inner.get(&key.into())?,
};
// Return result
Ok(res)
}

View file

@ -11,15 +11,15 @@ async fn multireader() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx1.get("test").await.unwrap().unwrap();
let val = tx1.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx2 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx2.get("test").await.unwrap().unwrap();
let val = tx2.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx3 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx3.get("test").await.unwrap().unwrap();
let val = tx3.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Cancel both readonly transactions
tx1.cancel().await.unwrap();

View file

@ -24,13 +24,13 @@ async fn multiwriter_different_keys() {
tx3.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
let val = tx.get("test1").await.unwrap().unwrap();
let val = tx.get("test1", None).await.unwrap().unwrap();
assert_eq!(val, b"other text 1");
let val = tx.get("test2").await.unwrap().unwrap();
let val = tx.get("test2", None).await.unwrap().unwrap();
assert_eq!(val, b"other text 2");
let val = tx.get("test3").await.unwrap().unwrap();
let val = tx.get("test3", None).await.unwrap().unwrap();
assert_eq!(val, b"other text 3");
tx.cancel().await.unwrap();
}

View file

@ -24,7 +24,7 @@ async fn multiwriter_same_keys_allow() {
assert!(tx3.commit().await.is_ok());
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"other text 3");
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -33,7 +33,7 @@ async fn multiwriter_same_keys_allow() {
tx.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"original text");
tx.cancel().await.unwrap();
}

View file

@ -24,7 +24,7 @@ async fn multiwriter_same_keys_conflict() {
assert!(tx3.commit().await.is_err());
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"other text 1");
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -33,7 +33,7 @@ async fn multiwriter_same_keys_conflict() {
tx.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"original text");
tx.cancel().await.unwrap();
}

View file

@ -39,9 +39,9 @@ async fn get() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"ok")));
let val = tx.get("none").await.unwrap();
let val = tx.get("none", None).await.unwrap();
assert!(val.as_deref().is_none());
tx.cancel().await.unwrap();
}
@ -59,7 +59,7 @@ async fn set() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -68,7 +68,7 @@ async fn set() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
}
@ -86,7 +86,7 @@ async fn put() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -95,7 +95,7 @@ async fn put() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
}
@ -113,7 +113,7 @@ async fn putc() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -122,7 +122,7 @@ async fn putc() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -131,7 +131,7 @@ async fn putc() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"two")));
tx.cancel().await.unwrap();
}
@ -153,7 +153,7 @@ async fn del() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(val.as_deref().is_none());
tx.cancel().await.unwrap();
}
@ -175,7 +175,7 @@ async fn delc() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(matches!(val.as_deref(), Some(b"one")));
tx.cancel().await.unwrap();
// Create a writeable transaction
@ -184,7 +184,7 @@ async fn delc() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap();
let val = tx.get("test", None).await.unwrap();
assert!(val.as_deref().is_none());
tx.cancel().await.unwrap();
}

View file

@ -12,7 +12,7 @@ async fn snapshot() {
// Create a readonly transaction
let mut tx1 = ds.transaction(Read, Optimistic).await.unwrap().inner();
// Check that the key was inserted ok
let val = tx1.get("test").await.unwrap().unwrap();
let val = tx1.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a new writeable transaction
let mut txw = ds.transaction(Write, Optimistic).await.unwrap().inner();
@ -20,16 +20,16 @@ async fn snapshot() {
txw.set("test", "other text").await.unwrap();
// Create a readonly transaction
let mut tx2 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx2.get("test").await.unwrap().unwrap();
let val = tx2.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Create a readonly transaction
let mut tx3 = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx3.get("test").await.unwrap().unwrap();
let val = tx3.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Update the test key content
txw.set("test", "extra text").await.unwrap();
// Check the key from the original transaction
let val = tx1.get("test").await.unwrap().unwrap();
let val = tx1.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"some text");
// Cancel both readonly transactions
tx1.cancel().await.unwrap();
@ -39,7 +39,7 @@ async fn snapshot() {
txw.commit().await.unwrap();
// Check that the key was updated ok
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let val = tx.get("test").await.unwrap().unwrap();
let val = tx.get("test", None).await.unwrap().unwrap();
assert_eq!(val, b"extra text");
tx.cancel().await.unwrap();
}

View file

@ -184,10 +184,15 @@ impl super::api::Transaction for Transaction {
/// Fetch a key from the database
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// TiKV does not support verisoned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
// Check to see if transaction is closed
if self.done {
return Err(Error::TxFinished);
@ -414,7 +419,7 @@ impl super::api::Transaction for Transaction {
// Get the transaction version
let ver = self.inner.current_timestamp().await?.version();
// Calculate the previous version value
if let Some(prev) = self.get(key.as_slice()).await? {
if let Some(prev) = self.get(key.as_slice(), None).await? {
let res: Result<[u8; 10], Error> = match prev.as_slice().try_into() {
Ok(ba) => Ok(ba),
Err(e) => Err(Error::Tx(e.to_string())),

View file

@ -187,12 +187,12 @@ impl Transactor {
/// Fetch a key from the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn get<K>(&mut self, key: K) -> Result<Option<Val>, Error>
pub async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Debug,
{
let key = key.into();
expand_inner!(&mut self.inner, v => { v.get(key).await })
expand_inner!(&mut self.inner, v => { v.get(key, version).await })
}
/// Fetch many keys from the datastore.
@ -437,7 +437,7 @@ impl Transactor {
Ok(if let Some(v) = self.stash.get(key) {
v
} else {
let val = self.get(key.clone()).await?;
let val = self.get(key.clone(), None).await?;
if let Some(val) = val {
U32::new(key.clone(), Some(val)).await?
} else {

View file

@ -110,11 +110,11 @@ impl Transaction {
/// Fetch a key from the datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn get<K>(&self, key: K) -> Result<Option<Val>, Error>
pub async fn get<K>(&self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: Into<Key> + Debug,
{
self.lock().await.get(key).await
self.lock().await.get(key, version).await
}
/// Retrieve a batch set of keys from the datastore.
@ -790,7 +790,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::NdNotFound {
let val = self.get(key, None).await?.ok_or(Error::NdNotFound {
value: id.to_string(),
})?;
let val: Node = val.into();
@ -810,7 +810,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::UserRootNotFound {
let val = self.get(key, None).await?.ok_or(Error::UserRootNotFound {
value: us.to_owned(),
})?;
let val: DefineUserStatement = val.into();
@ -830,7 +830,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessRootNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessRootNotFound {
ac: ra.to_owned(),
})?;
let val: DefineAccessStatement = val.into();
@ -854,7 +854,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessGrantRootNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessGrantRootNotFound {
ac: ac.to_owned(),
gr: gr.to_owned(),
})?;
@ -875,7 +875,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::NsNotFound {
let val = self.get(key, None).await?.ok_or(Error::NsNotFound {
value: ns.to_owned(),
})?;
let val: DefineNamespaceStatement = val.into();
@ -895,7 +895,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::UserNsNotFound {
let val = self.get(key, None).await?.ok_or(Error::UserNsNotFound {
value: us.to_owned(),
ns: ns.to_owned(),
})?;
@ -920,7 +920,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessNsNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessNsNotFound {
ac: na.to_owned(),
ns: ns.to_owned(),
})?;
@ -946,7 +946,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessGrantNsNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessGrantNsNotFound {
ac: ac.to_owned(),
gr: gr.to_owned(),
ns: ns.to_owned(),
@ -968,7 +968,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::DbNotFound {
let val = self.get(key, None).await?.ok_or(Error::DbNotFound {
value: db.to_owned(),
})?;
let val: DefineDatabaseStatement = val.into();
@ -993,7 +993,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::UserDbNotFound {
let val = self.get(key, None).await?.ok_or(Error::UserDbNotFound {
value: us.to_owned(),
ns: ns.to_owned(),
db: db.to_owned(),
@ -1020,7 +1020,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessDbNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessDbNotFound {
ac: da.to_owned(),
ns: ns.to_owned(),
db: db.to_owned(),
@ -1048,7 +1048,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AccessGrantDbNotFound {
let val = self.get(key, None).await?.ok_or(Error::AccessGrantDbNotFound {
ac: ac.to_owned(),
gr: gr.to_owned(),
ns: ns.to_owned(),
@ -1077,7 +1077,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::MlNotFound {
let val = self.get(key, None).await?.ok_or(Error::MlNotFound {
value: format!("{ml}<{vn}>"),
})?;
let val: DefineModelStatement = val.into();
@ -1102,7 +1102,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::AzNotFound {
let val = self.get(key, None).await?.ok_or(Error::AzNotFound {
value: az.to_owned(),
})?;
let val: DefineAnalyzerStatement = val.into();
@ -1127,7 +1127,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::FcNotFound {
let val = self.get(key, None).await?.ok_or(Error::FcNotFound {
value: fc.to_owned(),
})?;
let val: DefineFunctionStatement = val.into();
@ -1152,7 +1152,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::PaNotFound {
let val = self.get(key, None).await?.ok_or(Error::PaNotFound {
value: pa.to_owned(),
})?;
let val: DefineParamStatement = val.into();
@ -1177,7 +1177,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::TbNotFound {
let val = self.get(key, None).await?.ok_or(Error::TbNotFound {
value: tb.to_owned(),
})?;
let val: DefineTableStatement = val.into();
@ -1203,7 +1203,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::EvNotFound {
let val = self.get(key, None).await?.ok_or(Error::EvNotFound {
value: ev.to_owned(),
})?;
let val: DefineEventStatement = val.into();
@ -1229,7 +1229,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::FdNotFound {
let val = self.get(key, None).await?.ok_or(Error::FdNotFound {
value: fd.to_owned(),
})?;
let val: DefineFieldStatement = val.into();
@ -1255,7 +1255,7 @@ impl Transaction {
Ok(match res {
Ok(val) => val,
Err(cache) => {
let val = self.get(key).await?.ok_or(Error::IxNotFound {
let val = self.get(key, None).await?.ok_or(Error::IxNotFound {
value: ix.to_owned(),
})?;
let val: DefineIndexStatement = val.into();
@ -1282,7 +1282,7 @@ impl Transaction {
// The entry is in the cache
Ok(val) => Ok(val.into_val()),
// The entry is not in the cache
Err(cache) => match self.get(key).await? {
Err(cache) => match self.get(key, None).await? {
// The value exists in the datastore
Some(val) => {
let val = Entry::Val(Arc::new(val.into()));
@ -1445,7 +1445,7 @@ impl Transaction {
// The entry is not in the cache
Err(cache) => {
// Try to fetch the value from the datastore
let res = self.get(&key).await?.ok_or(Error::NsNotFound {
let res = self.get(&key, None).await?.ok_or(Error::NsNotFound {
value: ns.to_owned(),
});
// Check whether the value exists in the datastore
@ -1498,7 +1498,7 @@ impl Transaction {
// The entry is not in the cache
Err(cache) => {
// Try to fetch the value from the datastore
let res = self.get(&key).await?.ok_or(Error::DbNotFound {
let res = self.get(&key, None).await?.ok_or(Error::DbNotFound {
value: db.to_owned(),
});
// Check whether the value exists in the datastore
@ -1566,7 +1566,7 @@ impl Transaction {
// The entry is not in the cache
Err(cache) => {
// Try to fetch the value from the datastore
let res = self.get(&key).await?.ok_or(Error::TbNotFound {
let res = self.get(&key, None).await?.ok_or(Error::TbNotFound {
value: tb.to_owned(),
});
// Check whether the value exists in the datastore

View file

@ -53,7 +53,7 @@ impl KillStatement {
// Fetch the live query key
let key = crate::key::node::lq::new(nid, lid);
// Fetch the live query key if it exists
match txn.get(key).await? {
match txn.get(key, None).await? {
Some(val) => {
// Decode the data for this live query
let val: Live = val.into();

View file

@ -468,6 +468,28 @@ mod api_integration {
panic!("query returned no record");
};
assert_eq!(name, "John v1");
let mut response = db
.query(format!("SELECT name FROM user VERSION d'{}'", version))
.await
.unwrap()
.check()
.unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John v1");
let mut response = db
.query(format!("SELECT name FROM user:john VERSION d'{}'", version))
.await
.unwrap()
.check()
.unwrap();
let Some(name): Option<String> = response.take("name").unwrap() else {
panic!("query returned no record");
};
assert_eq!(name, "John v1");
}
include!("api/mod.rs");

View file

@ -25,7 +25,7 @@ async fn handler(Extension(state): Extension<AppState>) -> impl IntoResponse {
// Cancel the transaction
trace!("Health endpoint cancelling transaction");
// Attempt to fetch data
match tx.get(vec![0x00]).await {
match tx.get(vec![0x00], None).await {
Err(_) => {
// Ensure the transaction is cancelled
let _ = tx.cancel().await;