From 15bfef4866e86677f4ca8620af7a2556ee7947c8 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Fri, 14 Apr 2023 04:43:25 -0700 Subject: [PATCH] Refactor - Clean-up and optimize KVs (#1807) --- lib/src/kvs/cache.rs | 10 +- lib/src/kvs/ds.rs | 34 ++- lib/src/kvs/tx.rs | 494 +++++++++++++++++++++---------------------- 3 files changed, 262 insertions(+), 276 deletions(-) diff --git a/lib/src/kvs/cache.rs b/lib/src/kvs/cache.rs index 57802395..322a9330 100644 --- a/lib/src/kvs/cache.rs +++ b/lib/src/kvs/cache.rs @@ -43,19 +43,15 @@ pub enum Entry { pub struct Cache(pub HashMap); impl Cache { - // Check if key exists - pub fn exi(&mut self, key: &Key) -> bool { - self.0.contains_key(key) - } - // Set a key in the cache + /// Set a key in the cache pub fn set(&mut self, key: Key, val: Entry) { self.0.insert(key, val); } - // Get a key from the cache + /// Get a key from the cache pub fn get(&mut self, key: &Key) -> Option { self.0.get(key).cloned() } - // Delete a key from the cache + /// Delete a key from the cache pub fn del(&mut self, key: &Key) -> Option { self.0.remove(key) } diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index 0e285592..2e8448e4 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -190,50 +190,40 @@ impl Datastore { /// ``` pub async fn transaction(&self, write: bool, lock: bool) -> Result { #![allow(unused_variables)] - match &self.inner { + let inner = match &self.inner { #[cfg(feature = "kv-mem")] Inner::Mem(v) => { let tx = v.transaction(write, lock).await?; - Ok(Transaction { - inner: super::tx::Inner::Mem(tx), - cache: super::cache::Cache::default(), - }) + super::tx::Inner::Mem(tx) } #[cfg(feature = "kv-rocksdb")] Inner::RocksDB(v) => { let tx = v.transaction(write, lock).await?; - Ok(Transaction { - inner: super::tx::Inner::RocksDB(tx), - cache: super::cache::Cache::default(), - }) + super::tx::Inner::RocksDB(tx) } #[cfg(feature = "kv-indxdb")] Inner::IndxDB(v) => { let tx = v.transaction(write, lock).await?; - Ok(Transaction { - inner: super::tx::Inner::IndxDB(tx), - cache: super::cache::Cache::default(), - }) + super::tx::Inner::IndxDB(tx) } #[cfg(feature = "kv-tikv")] Inner::TiKV(v) => { let tx = v.transaction(write, lock).await?; - Ok(Transaction { - inner: super::tx::Inner::TiKV(tx), - cache: super::cache::Cache::default(), - }) + super::tx::Inner::TiKV(tx) } #[cfg(feature = "kv-fdb")] Inner::FDB(v) => { let tx = v.transaction(write, lock).await?; - Ok(Transaction { - inner: super::tx::Inner::FDB(tx), - cache: super::cache::Cache::default(), - }) + super::tx::Inner::FDB(tx) } #[allow(unreachable_patterns)] _ => unreachable!(), - } + }; + + Ok(Transaction { + inner, + cache: super::cache::Cache::default(), + }) } /// Parse and execute an SQL query diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 8c24b3b4..2a506345 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -732,77 +732,77 @@ impl Transaction { /// Retrieve all namespace definitions in a datastore. pub async fn all_ns(&mut self) -> Result, Error> { let key = crate::key::ns::prefix(); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Nss(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::ns::prefix(); - let end = crate::key::ns::suffix(); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Nss(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Nss(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::ns::prefix(); + let end = crate::key::ns::suffix(); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Nss(Arc::clone(&val))); + val + }) } /// Retrieve all namespace login definitions for a specific namespace. pub async fn all_nl(&mut self, ns: &str) -> Result, Error> { let key = crate::key::nl::prefix(ns); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Nls(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::nl::prefix(ns); - let end = crate::key::nl::suffix(ns); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Nls(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Nls(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::nl::prefix(ns); + let end = crate::key::nl::suffix(ns); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Nls(Arc::clone(&val))); + val + }) } /// Retrieve all namespace token definitions for a specific namespace. pub async fn all_nt(&mut self, ns: &str) -> Result, Error> { let key = crate::key::nt::prefix(ns); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Nts(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::nt::prefix(ns); - let end = crate::key::nt::suffix(ns); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Nts(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Nts(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::nt::prefix(ns); + let end = crate::key::nt::suffix(ns); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Nts(Arc::clone(&val))); + val + }) } /// Retrieve all database definitions for a specific namespace. pub async fn all_db(&mut self, ns: &str) -> Result, Error> { let key = crate::key::db::prefix(ns); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Dbs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::db::prefix(ns); - let end = crate::key::db::suffix(ns); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Dbs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Dbs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::db::prefix(ns); + let end = crate::key::db::suffix(ns); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Dbs(Arc::clone(&val))); + val + }) } /// Retrieve all database login definitions for a specific database. @@ -812,20 +812,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::dl::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Dls(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::dl::prefix(ns, db); - let end = crate::key::dl::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Dls(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Dls(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::dl::prefix(ns, db); + let end = crate::key::dl::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Dls(Arc::clone(&val))); + val + }) } /// Retrieve all database token definitions for a specific database. @@ -835,20 +835,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::dt::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Dts(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::dt::prefix(ns, db); - let end = crate::key::dt::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Dts(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Dts(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::dt::prefix(ns, db); + let end = crate::key::dt::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Dts(Arc::clone(&val))); + val + }) } /// Retrieve all function definitions for a specific database. @@ -858,20 +858,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::fc::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Fcs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::fc::prefix(ns, db); - let end = crate::key::fc::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Fcs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Fcs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::fc::prefix(ns, db); + let end = crate::key::fc::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Fcs(Arc::clone(&val))); + val + }) } /// Retrieve all scope definitions for a specific database. @@ -881,20 +881,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::sc::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Scs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::sc::prefix(ns, db); - let end = crate::key::sc::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Scs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Scs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::sc::prefix(ns, db); + let end = crate::key::sc::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Scs(Arc::clone(&val))); + val + }) } /// Retrieve all scope token definitions for a scope. @@ -905,20 +905,20 @@ impl Transaction { sc: &str, ) -> Result, Error> { let key = crate::key::st::prefix(ns, db, sc); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Sts(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::st::prefix(ns, db, sc); - let end = crate::key::st::suffix(ns, db, sc); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Sts(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Sts(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::st::prefix(ns, db, sc); + let end = crate::key::st::suffix(ns, db, sc); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Sts(Arc::clone(&val))); + val + }) } /// Retrieve all scope definitions for a specific database. @@ -928,20 +928,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::pa::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Pas(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::pa::prefix(ns, db); - let end = crate::key::pa::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Pas(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Pas(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::pa::prefix(ns, db); + let end = crate::key::pa::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Pas(Arc::clone(&val))); + val + }) } /// Retrieve all table definitions for a specific database. @@ -951,20 +951,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::tb::prefix(ns, db); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Tbs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::tb::prefix(ns, db); - let end = crate::key::tb::suffix(ns, db); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Tbs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Tbs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::tb::prefix(ns, db); + let end = crate::key::tb::suffix(ns, db); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Tbs(Arc::clone(&val))); + val + }) } /// Retrieve all event definitions for a specific table. @@ -975,20 +975,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::ev::prefix(ns, db, tb); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Evs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::ev::prefix(ns, db, tb); - let end = crate::key::ev::suffix(ns, db, tb); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Evs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Evs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::ev::prefix(ns, db, tb); + let end = crate::key::ev::suffix(ns, db, tb); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Evs(Arc::clone(&val))); + val + }) } /// Retrieve all field definitions for a specific table. @@ -999,20 +999,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::fd::prefix(ns, db, tb); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Fds(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::fd::prefix(ns, db, tb); - let end = crate::key::fd::suffix(ns, db, tb); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Fds(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Fds(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::fd::prefix(ns, db, tb); + let end = crate::key::fd::suffix(ns, db, tb); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Fds(Arc::clone(&val))); + val + }) } /// Retrieve all index definitions for a specific table. @@ -1023,20 +1023,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::ix::prefix(ns, db, tb); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Ixs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::ix::prefix(ns, db, tb); - let end = crate::key::ix::suffix(ns, db, tb); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Ixs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Ixs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::ix::prefix(ns, db, tb); + let end = crate::key::ix::suffix(ns, db, tb); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Ixs(Arc::clone(&val))); + val + }) } /// Retrieve all view definitions for a specific table. @@ -1047,20 +1047,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::ft::prefix(ns, db, tb); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Fts(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::ft::prefix(ns, db, tb); - let end = crate::key::ft::suffix(ns, db, tb); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Fts(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Fts(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::ft::prefix(ns, db, tb); + let end = crate::key::ft::suffix(ns, db, tb); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Fts(Arc::clone(&val))); + val + }) } /// Retrieve all live definitions for a specific table. @@ -1071,20 +1071,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::lv::prefix(ns, db, tb); - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Lvs(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let beg = crate::key::lv::prefix(ns, db, tb); - let end = crate::key::lv::suffix(ns, db, tb); - let val = self.getr(beg..end, u32::MAX).await?; - let val = val.convert().into(); - self.cache.set(key, Entry::Lvs(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Lvs(v) = e { + v + } else { + unreachable!(); } - } + } else { + let beg = crate::key::lv::prefix(ns, db, tb); + let end = crate::key::lv::suffix(ns, db, tb); + let val = self.getr(beg..end, u32::MAX).await?; + let val = val.convert().into(); + self.cache.set(key, Entry::Lvs(Arc::clone(&val))); + val + }) } /// Retrieve a specific namespace definition. @@ -1344,20 +1344,20 @@ impl Transaction { ns: &str, ) -> Result, Error> { let key = crate::key::ns::new(ns).encode()?; - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Ns(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let val = self.get(key.clone()).await?.ok_or(Error::NsNotFound { - value: ns.to_owned(), - })?; - let val: Arc = Arc::new(val.into()); - self.cache.set(key, Entry::Ns(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Ns(v) = e { + v + } else { + unreachable!(); } - } + } else { + let val = self.get(key.clone()).await?.ok_or(Error::NsNotFound { + value: ns.to_owned(), + })?; + let val: Arc = Arc::new(val.into()); + self.cache.set(key, Entry::Ns(Arc::clone(&val))); + val + }) } /// Retrieve and cache a specific database definition. @@ -1367,20 +1367,20 @@ impl Transaction { db: &str, ) -> Result, Error> { let key = crate::key::db::new(ns, db).encode()?; - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Db(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let val = self.get(key.clone()).await?.ok_or(Error::DbNotFound { - value: db.to_owned(), - })?; - let val: Arc = Arc::new(val.into()); - self.cache.set(key, Entry::Db(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Db(v) = e { + v + } else { + unreachable!(); } - } + } else { + let val = self.get(key.clone()).await?.ok_or(Error::DbNotFound { + value: db.to_owned(), + })?; + let val: Arc = Arc::new(val.into()); + self.cache.set(key, Entry::Db(Arc::clone(&val))); + val + }) } /// Retrieve and cache a specific table definition. @@ -1391,20 +1391,20 @@ impl Transaction { tb: &str, ) -> Result, Error> { let key = crate::key::tb::new(ns, db, tb).encode()?; - match self.cache.exi(&key) { - true => match self.cache.get(&key) { - Some(Entry::Tb(v)) => Ok(v), - _ => unreachable!(), - }, - _ => { - let val = self.get(key.clone()).await?.ok_or(Error::TbNotFound { - value: tb.to_owned(), - })?; - let val: Arc = Arc::new(val.into()); - self.cache.set(key, Entry::Tb(Arc::clone(&val))); - Ok(val) + Ok(if let Some(e) = self.cache.get(&key) { + if let Entry::Tb(v) = e { + v + } else { + unreachable!(); } - } + } else { + let val = self.get(key.clone()).await?.ok_or(Error::TbNotFound { + value: tb.to_owned(), + })?; + let val: Arc = Arc::new(val.into()); + self.cache.set(key, Entry::Tb(Arc::clone(&val))); + val + }) } /// Add a namespace with a default configuration, only if we are in dynamic mode.