diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 56ddd71f..3b517a84 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -14,6 +14,7 @@ use crate::key::root::hb::Hb; use crate::sql; use crate::sql::Value; use crate::sql::{Query, Uuid}; +use crate::vs; use channel::Receiver; use channel::Sender; use futures::lock::Mutex; @@ -47,6 +48,9 @@ pub struct Datastore { query_timeout: Option, // The maximum duration timeout for running multiple statements in a transaction transaction_timeout: Option, + // The versionstamp oracle for this datastore. + // Used only in some datastores, such as tikv. + vso: Arc>, // Whether this datastore enables live query notifications to subscribers notification_channel: Option<(Sender, Receiver)>, } @@ -244,6 +248,7 @@ impl Datastore { strict: false, query_timeout: None, transaction_timeout: None, + vso: Arc::new(Mutex::new(vs::Oracle::systime_counter())), notification_channel: None, }) } @@ -531,6 +536,7 @@ impl Datastore { inner, cache: super::cache::Cache::default(), cf: cf::Writer::new(), + vso: self.vso.clone(), }) } diff --git a/lib/src/kvs/tikv/mod.rs b/lib/src/kvs/tikv/mod.rs index b0e7224c..b445833e 100644 --- a/lib/src/kvs/tikv/mod.rs +++ b/lib/src/kvs/tikv/mod.rs @@ -137,6 +137,7 @@ impl Transaction { Ok(u64_to_versionstamp(ver)) } /// Obtain a new key that is suffixed with the change timestamp + #[allow(unused)] pub async fn get_versionstamped_key( &mut self, ts_key: K, diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index b1af53ed..3e80b475 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -16,8 +16,10 @@ use crate::sql::paths::OUT; use crate::sql::thing::Thing; use crate::sql::Strand; use crate::sql::Value; +use crate::vs::Oracle; use crate::vs::Versionstamp; use channel::Sender; +use futures::lock::Mutex; use sql::permission::Permissions; use sql::statements::DefineAnalyzerStatement; use sql::statements::DefineDatabaseStatement; @@ -46,6 +48,7 @@ pub struct Transaction { pub(super) inner: Inner, pub(super) cache: Cache, pub(super) cf: cf::Writer, + pub(super) vso: Arc>, } #[allow(clippy::large_enum_variant)] @@ -409,6 +412,21 @@ impl Transaction { { #[cfg(debug_assertions)] trace!("Get Timestamp {:?}", key); + let use_nonmonontonic = match self { + #[cfg(feature = "kv-tikv")] + Transaction { + inner: Inner::TiKV(v), + .. + } => true, + _ => false, + }; + let nonmonotonic_vs = if use_nonmonontonic { + self.get_non_monotonic_versionstamp().await + } else { + Err(Error::Internal( + "Non-monotonic versionstamps are only supported on TiKV".to_string(), + )) + }; match self { #[cfg(feature = "kv-mem")] Transaction { @@ -429,7 +447,11 @@ impl Transaction { Transaction { inner: Inner::TiKV(v), .. - } => v.get_timestamp(key, lock).await, + } => { + // TODO Make it configurable to use monotonic or non-monotonic versionstamps + // v.get_timestamp(key, lock).await + nonmonotonic_vs + } #[cfg(feature = "kv-fdb")] Transaction { inner: Inner::FoundationDB(v), @@ -445,6 +467,29 @@ impl Transaction { } } + #[allow(unused)] + async fn get_non_monotonic_versionstamp(&mut self) -> Result { + Ok(self.vso.lock().await.now()) + } + + #[allow(unused)] + async fn get_non_monotonic_versionstamped_key( + &mut self, + prefix: K, + suffix: K, + ) -> Result, Error> + where + K: Into, + { + let prefix: Key = prefix.into(); + let suffix: Key = suffix.into(); + let ts = self.get_non_monotonic_versionstamp().await?; + let mut k: Vec = prefix.clone(); + k.append(&mut ts.to_vec()); + k.append(&mut suffix.clone()); + Ok(k) + } + /// Insert or update a key in the datastore. #[allow(unused_variables)] pub async fn set_versionstamped_key( @@ -455,11 +500,25 @@ impl Transaction { val: V, ) -> Result<(), Error> where - K: Into + Debug, + K: Into + Debug + Clone, V: Into + Debug, { #[cfg(debug_assertions)] trace!("Set {:?} {:?} => {:?}", prefix, suffix, val); + let nonmonotonic_key: Result, Error> = match self { + #[cfg(feature = "kv-tikv")] + Transaction { + inner: Inner::TiKV(v), + .. + } => self.get_non_monotonic_versionstamped_key(prefix.clone(), suffix.clone()).await, + // We need this to make the compiler happy. + // The below is unreachable only when only the tikv feature is enabled. + // It's still reachable if we enabled more than one kv feature. + #[allow(unreachable_patterns)] + _ => Err(Error::Internal( + "Non-monotonic versionstamps are only supported on TiKV".to_string(), + )), + }; match self { #[cfg(feature = "kv-mem")] Transaction { @@ -490,7 +549,10 @@ impl Transaction { inner: Inner::TiKV(v), .. } => { - let k = v.get_versionstamped_key(ts_key, prefix, suffix).await?; + // TODO Maybe make it configurable to use monotonic or non-monotonic versionstamps + // at the database definition time? + // let k = v.get_versionstamped_key(ts_key, prefix, suffix).await?; + let k = nonmonotonic_key?; v.set(k, val).await } #[cfg(feature = "kv-fdb")] diff --git a/lib/src/vs/oracle.rs b/lib/src/vs/oracle.rs index c1ac5033..eaa8d7c5 100644 --- a/lib/src/vs/oracle.rs +++ b/lib/src/vs/oracle.rs @@ -52,6 +52,22 @@ pub enum Oracle { } impl Oracle { + #[allow(unused)] + pub fn systime_counter() -> Self { + Oracle::SysTimeCounter(SysTimeCounter { + state: Mutex::new((0, 0)), + stale: (0, 0), + }) + } + + #[allow(unused)] + pub fn epoch_counter() -> Self { + Oracle::EpochCounter(EpochCounter { + epoch: 0, + counter: AtomicU64::new(0), + }) + } + #[allow(unused)] pub fn now(&mut self) -> Versionstamp { match self { @@ -141,10 +157,7 @@ mod tests { #[test] fn systime_counter() { - let mut o = Oracle::SysTimeCounter(SysTimeCounter { - state: Mutex::new((0, 0)), - stale: (0, 0), - }); + let mut o = Oracle::systime_counter(); let a = to_u128_be(o.now()); let b = to_u128_be(o.now()); assert!(a < b, "a = {}, b = {}", a, b); @@ -152,10 +165,7 @@ mod tests { #[test] fn epoch_counter() { - let mut o1 = Oracle::EpochCounter(EpochCounter { - epoch: 0, - counter: AtomicU64::new(0), - }); + let mut o1 = Oracle::epoch_counter(); let a = to_u128_be(o1.now()); let b = to_u128_be(o1.now()); assert!(a < b, "a = {}, b = {}", a, b);