Refactor - Clean-up and optimize KVs (#1807)

This commit is contained in:
Finn Bear 2023-04-14 04:43:25 -07:00 committed by GitHub
parent d87836fc84
commit 15bfef4866
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 262 additions and 276 deletions

View file

@ -43,19 +43,15 @@ pub enum Entry {
pub struct Cache(pub HashMap<Key, Entry>);
impl Cache {
// Check if key exists
pub fn exi(&mut self, key: &Key) -> bool {
self.0.contains_key(key)
}
// Set a key in the cache
/// Set a key in the cache
pub fn set(&mut self, key: Key, val: Entry) {
self.0.insert(key, val);
}
// Get a key from the cache
/// Get a key from the cache
pub fn get(&mut self, key: &Key) -> Option<Entry> {
self.0.get(key).cloned()
}
// Delete a key from the cache
/// Delete a key from the cache
pub fn del(&mut self, key: &Key) -> Option<Entry> {
self.0.remove(key)
}

View file

@ -190,50 +190,40 @@ impl Datastore {
/// ```
pub async fn transaction(&self, write: bool, lock: bool) -> Result<Transaction, Error> {
#![allow(unused_variables)]
match &self.inner {
let inner = match &self.inner {
#[cfg(feature = "kv-mem")]
Inner::Mem(v) => {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::Mem(tx),
cache: super::cache::Cache::default(),
})
super::tx::Inner::Mem(tx)
}
#[cfg(feature = "kv-rocksdb")]
Inner::RocksDB(v) => {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::RocksDB(tx),
cache: super::cache::Cache::default(),
})
super::tx::Inner::RocksDB(tx)
}
#[cfg(feature = "kv-indxdb")]
Inner::IndxDB(v) => {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::IndxDB(tx),
cache: super::cache::Cache::default(),
})
super::tx::Inner::IndxDB(tx)
}
#[cfg(feature = "kv-tikv")]
Inner::TiKV(v) => {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::TiKV(tx),
cache: super::cache::Cache::default(),
})
super::tx::Inner::TiKV(tx)
}
#[cfg(feature = "kv-fdb")]
Inner::FDB(v) => {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::FDB(tx),
cache: super::cache::Cache::default(),
})
super::tx::Inner::FDB(tx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
};
Ok(Transaction {
inner,
cache: super::cache::Cache::default(),
})
}
/// Parse and execute an SQL query

View file

@ -732,77 +732,77 @@ impl Transaction {
/// Retrieve all namespace definitions in a datastore.
pub async fn all_ns(&mut self) -> Result<Arc<[DefineNamespaceStatement]>, Error> {
let key = crate::key::ns::prefix();
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nss(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ns::prefix();
let end = crate::key::ns::suffix();
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nss(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Nss(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::ns::prefix();
let end = crate::key::ns::suffix();
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nss(Arc::clone(&val)));
val
})
}
/// Retrieve all namespace login definitions for a specific namespace.
pub async fn all_nl(&mut self, ns: &str) -> Result<Arc<[DefineLoginStatement]>, Error> {
let key = crate::key::nl::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nls(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::nl::prefix(ns);
let end = crate::key::nl::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nls(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Nls(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::nl::prefix(ns);
let end = crate::key::nl::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nls(Arc::clone(&val)));
val
})
}
/// Retrieve all namespace token definitions for a specific namespace.
pub async fn all_nt(&mut self, ns: &str) -> Result<Arc<[DefineTokenStatement]>, Error> {
let key = crate::key::nt::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::nt::prefix(ns);
let end = crate::key::nt::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nts(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Nts(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::nt::prefix(ns);
let end = crate::key::nt::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Nts(Arc::clone(&val)));
val
})
}
/// Retrieve all database definitions for a specific namespace.
pub async fn all_db(&mut self, ns: &str) -> Result<Arc<[DefineDatabaseStatement]>, Error> {
let key = crate::key::db::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dbs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::db::prefix(ns);
let end = crate::key::db::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dbs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Dbs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::db::prefix(ns);
let end = crate::key::db::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dbs(Arc::clone(&val)));
val
})
}
/// Retrieve all database login definitions for a specific database.
@ -812,20 +812,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineLoginStatement]>, Error> {
let key = crate::key::dl::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dls(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::dl::prefix(ns, db);
let end = crate::key::dl::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dls(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Dls(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::dl::prefix(ns, db);
let end = crate::key::dl::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dls(Arc::clone(&val)));
val
})
}
/// Retrieve all database token definitions for a specific database.
@ -835,20 +835,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineTokenStatement]>, Error> {
let key = crate::key::dt::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::dt::prefix(ns, db);
let end = crate::key::dt::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dts(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Dts(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::dt::prefix(ns, db);
let end = crate::key::dt::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Dts(Arc::clone(&val)));
val
})
}
/// Retrieve all function definitions for a specific database.
@ -858,20 +858,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineFunctionStatement]>, Error> {
let key = crate::key::fc::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Fcs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::fc::prefix(ns, db);
let end = crate::key::fc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fcs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Fcs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::fc::prefix(ns, db);
let end = crate::key::fc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fcs(Arc::clone(&val)));
val
})
}
/// Retrieve all scope definitions for a specific database.
@ -881,20 +881,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineScopeStatement]>, Error> {
let key = crate::key::sc::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Scs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::sc::prefix(ns, db);
let end = crate::key::sc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Scs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Scs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::sc::prefix(ns, db);
let end = crate::key::sc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Scs(Arc::clone(&val)));
val
})
}
/// Retrieve all scope token definitions for a scope.
@ -905,20 +905,20 @@ impl Transaction {
sc: &str,
) -> Result<Arc<[DefineTokenStatement]>, Error> {
let key = crate::key::st::prefix(ns, db, sc);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Sts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::st::prefix(ns, db, sc);
let end = crate::key::st::suffix(ns, db, sc);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Sts(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Sts(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::st::prefix(ns, db, sc);
let end = crate::key::st::suffix(ns, db, sc);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Sts(Arc::clone(&val)));
val
})
}
/// Retrieve all scope definitions for a specific database.
@ -928,20 +928,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineParamStatement]>, Error> {
let key = crate::key::pa::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Pas(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::pa::prefix(ns, db);
let end = crate::key::pa::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Pas(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Pas(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::pa::prefix(ns, db);
let end = crate::key::pa::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Pas(Arc::clone(&val)));
val
})
}
/// Retrieve all table definitions for a specific database.
@ -951,20 +951,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<[DefineTableStatement]>, Error> {
let key = crate::key::tb::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Tbs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::tb::prefix(ns, db);
let end = crate::key::tb::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Tbs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Tbs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::tb::prefix(ns, db);
let end = crate::key::tb::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Tbs(Arc::clone(&val)));
val
})
}
/// Retrieve all event definitions for a specific table.
@ -975,20 +975,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<[DefineEventStatement]>, Error> {
let key = crate::key::ev::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Evs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ev::prefix(ns, db, tb);
let end = crate::key::ev::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Evs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Evs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::ev::prefix(ns, db, tb);
let end = crate::key::ev::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Evs(Arc::clone(&val)));
val
})
}
/// Retrieve all field definitions for a specific table.
@ -999,20 +999,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<[DefineFieldStatement]>, Error> {
let key = crate::key::fd::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Fds(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::fd::prefix(ns, db, tb);
let end = crate::key::fd::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fds(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Fds(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::fd::prefix(ns, db, tb);
let end = crate::key::fd::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fds(Arc::clone(&val)));
val
})
}
/// Retrieve all index definitions for a specific table.
@ -1023,20 +1023,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<[DefineIndexStatement]>, Error> {
let key = crate::key::ix::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Ixs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ix::prefix(ns, db, tb);
let end = crate::key::ix::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Ixs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Ixs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::ix::prefix(ns, db, tb);
let end = crate::key::ix::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Ixs(Arc::clone(&val)));
val
})
}
/// Retrieve all view definitions for a specific table.
@ -1047,20 +1047,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<[DefineTableStatement]>, Error> {
let key = crate::key::ft::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Fts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ft::prefix(ns, db, tb);
let end = crate::key::ft::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fts(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Fts(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::ft::prefix(ns, db, tb);
let end = crate::key::ft::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Fts(Arc::clone(&val)));
val
})
}
/// Retrieve all live definitions for a specific table.
@ -1071,20 +1071,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<[LiveStatement]>, Error> {
let key = crate::key::lv::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Lvs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::lv::prefix(ns, db, tb);
let end = crate::key::lv::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Lvs(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Lvs(v) = e {
v
} else {
unreachable!();
}
}
} else {
let beg = crate::key::lv::prefix(ns, db, tb);
let end = crate::key::lv::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = val.convert().into();
self.cache.set(key, Entry::Lvs(Arc::clone(&val)));
val
})
}
/// Retrieve a specific namespace definition.
@ -1344,20 +1344,20 @@ impl Transaction {
ns: &str,
) -> Result<Arc<DefineNamespaceStatement>, Error> {
let key = crate::key::ns::new(ns).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Ns(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::NsNotFound {
value: ns.to_owned(),
})?;
let val: Arc<DefineNamespaceStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Ns(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Ns(v) = e {
v
} else {
unreachable!();
}
}
} else {
let val = self.get(key.clone()).await?.ok_or(Error::NsNotFound {
value: ns.to_owned(),
})?;
let val: Arc<DefineNamespaceStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Ns(Arc::clone(&val)));
val
})
}
/// Retrieve and cache a specific database definition.
@ -1367,20 +1367,20 @@ impl Transaction {
db: &str,
) -> Result<Arc<DefineDatabaseStatement>, Error> {
let key = crate::key::db::new(ns, db).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Db(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::DbNotFound {
value: db.to_owned(),
})?;
let val: Arc<DefineDatabaseStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Db(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Db(v) = e {
v
} else {
unreachable!();
}
}
} else {
let val = self.get(key.clone()).await?.ok_or(Error::DbNotFound {
value: db.to_owned(),
})?;
let val: Arc<DefineDatabaseStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Db(Arc::clone(&val)));
val
})
}
/// Retrieve and cache a specific table definition.
@ -1391,20 +1391,20 @@ impl Transaction {
tb: &str,
) -> Result<Arc<DefineTableStatement>, Error> {
let key = crate::key::tb::new(ns, db, tb).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Tb(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::TbNotFound {
value: tb.to_owned(),
})?;
let val: Arc<DefineTableStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Tb(Arc::clone(&val)));
Ok(val)
Ok(if let Some(e) = self.cache.get(&key) {
if let Entry::Tb(v) = e {
v
} else {
unreachable!();
}
}
} else {
let val = self.get(key.clone()).await?.ok_or(Error::TbNotFound {
value: tb.to_owned(),
})?;
let val: Arc<DefineTableStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Tb(Arc::clone(&val)));
val
})
}
/// Add a namespace with a default configuration, only if we are in dynamic mode.