Implement config definition caching within a transaction

Closes #21
This commit is contained in:
Tobie Morgan Hitchcock 2022-08-08 20:44:35 +01:00
parent 9ed50a9514
commit bac8aa31a3
6 changed files with 508 additions and 100 deletions

View file

@ -10,6 +10,7 @@ use crate::sql::statements::define::DefineTableStatement;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use std::borrow::Cow;
use std::sync::Arc;
pub struct Document<'a> {
pub(super) id: Option<Thing>,
@ -49,7 +50,7 @@ impl<'a> Document<'a> {
&self,
opt: &Options,
txn: &Transaction,
) -> Result<DefineTableStatement, Error> {
) -> Result<Arc<DefineTableStatement>, Error> {
// Clone transaction
let run = txn.clone();
// Claim transaction
@ -57,16 +58,16 @@ impl<'a> Document<'a> {
// Get the record id
let rid = self.id.as_ref().unwrap();
// Get the table definition
let tb = run.get_tb(opt.ns(), opt.db(), &rid.tb).await;
let tb = run.get_and_cache_tb(opt.ns(), opt.db(), &rid.tb).await;
// Return the table or attempt to define it
match tb {
// The table doesn't exist
Err(Error::TbNotFound) => match opt.auth.check(Level::Db) {
// We can create the table automatically
true => {
run.add_ns(opt.ns(), opt.strict).await?;
run.add_db(opt.ns(), opt.db(), opt.strict).await?;
run.add_tb(opt.ns(), opt.db(), &rid.tb, opt.strict).await
run.add_and_cache_ns(opt.ns(), opt.strict).await?;
run.add_and_cache_db(opt.ns(), opt.db(), opt.strict).await?;
run.add_and_cache_tb(opt.ns(), opt.db(), &rid.tb, opt.strict).await
}
// We can't create the table so error
false => Err(Error::TbNotFound),
@ -82,7 +83,7 @@ impl<'a> Document<'a> {
&self,
opt: &Options,
txn: &Transaction,
) -> Result<Vec<DefineTableStatement>, Error> {
) -> Result<Arc<Vec<DefineTableStatement>>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
// Get the table definitions
@ -93,7 +94,7 @@ impl<'a> Document<'a> {
&self,
opt: &Options,
txn: &Transaction,
) -> Result<Vec<DefineEventStatement>, Error> {
) -> Result<Arc<Vec<DefineEventStatement>>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
// Get the event definitions
@ -104,7 +105,7 @@ impl<'a> Document<'a> {
&self,
opt: &Options,
txn: &Transaction,
) -> Result<Vec<DefineFieldStatement>, Error> {
) -> Result<Arc<Vec<DefineFieldStatement>>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
// Get the field definitions
@ -115,7 +116,7 @@ impl<'a> Document<'a> {
&self,
opt: &Options,
txn: &Transaction,
) -> Result<Vec<DefineIndexStatement>, Error> {
) -> Result<Arc<Vec<DefineIndexStatement>>, Error> {
// Get the record id
let id = self.id.as_ref().unwrap();
// Get the index definitions

52
lib/src/kvs/cache.rs Normal file
View file

@ -0,0 +1,52 @@
use crate::kvs::kv::Key;
use crate::sql::statements::DefineDatabaseStatement;
use crate::sql::statements::DefineEventStatement;
use crate::sql::statements::DefineFieldStatement;
use crate::sql::statements::DefineIndexStatement;
use crate::sql::statements::DefineLoginStatement;
use crate::sql::statements::DefineNamespaceStatement;
use crate::sql::statements::DefineScopeStatement;
use crate::sql::statements::DefineTableStatement;
use crate::sql::statements::DefineTokenStatement;
use crate::sql::statements::LiveStatement;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone)]
pub enum Entry {
Ns(Arc<DefineNamespaceStatement>),
Db(Arc<DefineDatabaseStatement>),
Tb(Arc<DefineTableStatement>),
Nss(Arc<Vec<DefineNamespaceStatement>>),
Nls(Arc<Vec<DefineLoginStatement>>),
Nts(Arc<Vec<DefineTokenStatement>>),
Dbs(Arc<Vec<DefineDatabaseStatement>>),
Dls(Arc<Vec<DefineLoginStatement>>),
Dts(Arc<Vec<DefineTokenStatement>>),
Scs(Arc<Vec<DefineScopeStatement>>),
Sts(Arc<Vec<DefineTokenStatement>>),
Tbs(Arc<Vec<DefineTableStatement>>),
Evs(Arc<Vec<DefineEventStatement>>),
Fds(Arc<Vec<DefineFieldStatement>>),
Ixs(Arc<Vec<DefineIndexStatement>>),
Fts(Arc<Vec<DefineTableStatement>>),
Lvs(Arc<Vec<LiveStatement>>),
}
#[derive(Default)]
pub struct Cache(pub HashMap<Key, Entry>);
impl Cache {
// Check if key exists
pub fn exi(&mut self, key: &Key) -> bool {
self.0.contains_key(key)
}
// Set a key in the cache
pub fn set(&mut self, key: Key, val: Entry) {
self.0.insert(key, val);
}
// get a key from the cache
pub fn get(&mut self, key: &Key) -> Option<Entry> {
self.0.get(key).cloned()
}
}

View file

@ -127,6 +127,7 @@ impl Datastore {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::Mem(tx),
cache: super::cache::Cache::default(),
})
}
#[cfg(feature = "kv-indxdb")]
@ -134,6 +135,7 @@ impl Datastore {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::IxDB(tx),
cache: super::cache::Cache::default(),
})
}
#[cfg(feature = "kv-yokudb")]
@ -141,6 +143,7 @@ impl Datastore {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::File(tx),
cache: super::cache::Cache::default(),
})
}
#[cfg(feature = "kv-tikv")]
@ -148,6 +151,7 @@ impl Datastore {
let tx = v.transaction(write, lock).await?;
Ok(Transaction {
inner: super::tx::Inner::TiKV(tx),
cache: super::cache::Cache::default(),
})
}
}

View file

@ -1,3 +1,4 @@
mod cache;
mod ds;
mod file;
mod ixdb;

View file

@ -4,6 +4,8 @@ use super::Key;
use super::Val;
use crate::err::Error;
use crate::key::thing;
use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry;
use crate::sql;
use crate::sql::thing::Thing;
use channel::Sender;
@ -19,10 +21,12 @@ use sql::statements::DefineTableStatement;
use sql::statements::DefineTokenStatement;
use sql::statements::LiveStatement;
use std::ops::Range;
use std::sync::Arc;
/// A set of undoable updates and requests against a dataset.
pub struct Transaction {
pub(super) inner: Inner,
pub(super) cache: Cache,
}
#[allow(clippy::large_enum_variant)]
@ -49,18 +53,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.closed(),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.closed(),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.closed(),
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.closed(),
}
}
@ -72,18 +80,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.cancel(),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.cancel(),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.cancel().await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.cancel().await,
}
}
@ -95,18 +107,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.commit(),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.commit(),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.commit().await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.commit().await,
}
}
@ -119,18 +135,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.del(key),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.del(key),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.del(key).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.del(key).await,
}
}
@ -143,18 +163,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.exi(key),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.exi(key),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.exi(key).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.exi(key).await,
}
}
@ -167,18 +191,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.get(key),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.get(key),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.get(key).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.get(key).await,
}
}
@ -192,18 +220,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.set(key, val),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.set(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.set(key, val).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.set(key, val).await,
}
}
@ -217,18 +249,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.put(key, val),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.put(key, val),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.put(key, val).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.put(key, val).await,
}
}
@ -243,18 +279,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.scan(rng, limit),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.scan(rng, limit),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.scan(rng, limit).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.scan(rng, limit).await,
}
}
@ -268,18 +308,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.putc(key, val, chk),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.putc(key, val, chk),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.putc(key, val, chk).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.putc(key, val, chk).await,
}
}
@ -293,18 +337,22 @@ impl Transaction {
#[cfg(feature = "kv-echodb")]
Transaction {
inner: Inner::Mem(v),
..
} => v.delc(key, chk),
#[cfg(feature = "kv-yokudb")]
Transaction {
inner: Inner::File(v),
..
} => v.delc(key, chk),
#[cfg(feature = "kv-indxdb")]
Transaction {
inner: Inner::IxDB(v),
..
} => v.delc(key, chk).await,
#[cfg(feature = "kv-tikv")]
Transaction {
inner: Inner::TiKV(v),
..
} => v.delc(key, chk).await,
}
}
@ -507,53 +555,142 @@ impl Transaction {
Ok(())
}
/// Retrieve all namespace definitions in a datastore.
pub async fn all_ns(&mut self) -> Result<Vec<DefineNamespaceStatement>, Error> {
let beg = crate::key::ns::prefix();
let end = crate::key::ns::suffix();
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_ns(&mut self) -> Result<Arc<Vec<DefineNamespaceStatement>>, Error> {
let key = crate::key::ns::prefix();
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nss(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ns::prefix();
let end = crate::key::ns::suffix();
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Nss(val.clone()));
Ok(val)
}
}
}
/// Retrieve all namespace login definitions for a specific namespace.
pub async fn all_nl(&mut self, ns: &str) -> Result<Vec<DefineLoginStatement>, Error> {
let beg = crate::key::nl::prefix(ns);
let end = crate::key::nl::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_nl(&mut self, ns: &str) -> Result<Arc<Vec<DefineLoginStatement>>, Error> {
let key = crate::key::nl::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nls(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::nl::prefix(ns);
let end = crate::key::nl::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Nls(val.clone()));
Ok(val)
}
}
}
/// Retrieve all namespace token definitions for a specific namespace.
pub async fn all_nt(&mut self, ns: &str) -> Result<Vec<DefineTokenStatement>, Error> {
let beg = crate::key::nt::prefix(ns);
let end = crate::key::nt::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_nt(&mut self, ns: &str) -> Result<Arc<Vec<DefineTokenStatement>>, Error> {
let key = crate::key::nt::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Nts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::nt::prefix(ns);
let end = crate::key::nt::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Nts(val.clone()));
Ok(val)
}
}
}
/// Retrieve all database definitions for a specific namespace.
pub async fn all_db(&mut self, ns: &str) -> Result<Vec<DefineDatabaseStatement>, Error> {
let beg = crate::key::db::prefix(ns);
let end = crate::key::db::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_db(&mut self, ns: &str) -> Result<Arc<Vec<DefineDatabaseStatement>>, Error> {
let key = crate::key::db::prefix(ns);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dbs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::db::prefix(ns);
let end = crate::key::db::suffix(ns);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Dbs(val.clone()));
Ok(val)
}
}
}
/// Retrieve all database login definitions for a specific database.
pub async fn all_dl(&mut self, ns: &str, db: &str) -> Result<Vec<DefineLoginStatement>, Error> {
let beg = crate::key::dl::prefix(ns, db);
let end = crate::key::dl::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_dl(
&mut self,
ns: &str,
db: &str,
) -> Result<Arc<Vec<DefineLoginStatement>>, Error> {
let key = crate::key::dl::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dls(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::dl::prefix(ns, db);
let end = crate::key::dl::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Dls(val.clone()));
Ok(val)
}
}
}
/// Retrieve all database token definitions for a specific database.
pub async fn all_dt(&mut self, ns: &str, db: &str) -> Result<Vec<DefineTokenStatement>, Error> {
let beg = crate::key::dt::prefix(ns, db);
let end = crate::key::dt::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_dt(
&mut self,
ns: &str,
db: &str,
) -> Result<Arc<Vec<DefineTokenStatement>>, Error> {
let key = crate::key::dt::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Dts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::dt::prefix(ns, db);
let end = crate::key::dt::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Dts(val.clone()));
Ok(val)
}
}
}
/// Retrieve all scope definitions for a specific database.
pub async fn all_sc(&mut self, ns: &str, db: &str) -> Result<Vec<DefineScopeStatement>, Error> {
let beg = crate::key::sc::prefix(ns, db);
let end = crate::key::sc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_sc(
&mut self,
ns: &str,
db: &str,
) -> Result<Arc<Vec<DefineScopeStatement>>, Error> {
let key = crate::key::sc::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Scs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::sc::prefix(ns, db);
let end = crate::key::sc::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Scs(val.clone()));
Ok(val)
}
}
}
/// Retrieve all scope token definitions for a scope.
pub async fn all_st(
@ -561,18 +698,44 @@ impl Transaction {
ns: &str,
db: &str,
sc: &str,
) -> Result<Vec<DefineTokenStatement>, Error> {
let beg = crate::key::st::prefix(ns, db, sc);
let end = crate::key::st::suffix(ns, db, sc);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<DefineTokenStatement>>, Error> {
let key = crate::key::st::prefix(ns, db, sc);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Sts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::st::prefix(ns, db, sc);
let end = crate::key::st::suffix(ns, db, sc);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Sts(val.clone()));
Ok(val)
}
}
}
/// Retrieve all table definitions for a specific database.
pub async fn all_tb(&mut self, ns: &str, db: &str) -> Result<Vec<DefineTableStatement>, Error> {
let beg = crate::key::tb::prefix(ns, db);
let end = crate::key::tb::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
pub async fn all_tb(
&mut self,
ns: &str,
db: &str,
) -> Result<Arc<Vec<DefineTableStatement>>, Error> {
let key = crate::key::tb::prefix(ns, db);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Tbs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::tb::prefix(ns, db);
let end = crate::key::tb::suffix(ns, db);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Tbs(val.clone()));
Ok(val)
}
}
}
/// Retrieve all event definitions for a specific table.
pub async fn all_ev(
@ -580,11 +743,22 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
) -> Result<Vec<DefineEventStatement>, Error> {
let beg = crate::key::ev::prefix(ns, db, tb);
let end = crate::key::ev::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<DefineEventStatement>>, Error> {
let key = crate::key::ev::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Evs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ev::prefix(ns, db, tb);
let end = crate::key::ev::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Evs(val.clone()));
Ok(val)
}
}
}
/// Retrieve all field definitions for a specific table.
pub async fn all_fd(
@ -592,11 +766,22 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
) -> Result<Vec<DefineFieldStatement>, Error> {
let beg = crate::key::fd::prefix(ns, db, tb);
let end = crate::key::fd::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<DefineFieldStatement>>, Error> {
let key = crate::key::fd::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Fds(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::fd::prefix(ns, db, tb);
let end = crate::key::fd::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Fds(val.clone()));
Ok(val)
}
}
}
/// Retrieve all index definitions for a specific table.
pub async fn all_ix(
@ -604,11 +789,22 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
) -> Result<Vec<DefineIndexStatement>, Error> {
let beg = crate::key::ix::prefix(ns, db, tb);
let end = crate::key::ix::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<DefineIndexStatement>>, Error> {
let key = crate::key::ix::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Ixs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ix::prefix(ns, db, tb);
let end = crate::key::ix::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Ixs(val.clone()));
Ok(val)
}
}
}
/// Retrieve all view definitions for a specific table.
pub async fn all_ft(
@ -616,11 +812,22 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
) -> Result<Vec<DefineTableStatement>, Error> {
let beg = crate::key::ft::prefix(ns, db, tb);
let end = crate::key::ft::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<DefineTableStatement>>, Error> {
let key = crate::key::ft::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Fts(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::ft::prefix(ns, db, tb);
let end = crate::key::ft::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Fts(val.clone()));
Ok(val)
}
}
}
/// Retrieve all live definitions for a specific table.
pub async fn all_lv(
@ -628,11 +835,22 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
) -> Result<Vec<LiveStatement>, Error> {
let beg = crate::key::lv::prefix(ns, db, tb);
let end = crate::key::lv::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
Ok(val.convert())
) -> Result<Arc<Vec<LiveStatement>>, Error> {
let key = crate::key::lv::prefix(ns, db, tb);
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Lvs(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let beg = crate::key::lv::prefix(ns, db, tb);
let end = crate::key::lv::suffix(ns, db, tb);
let val = self.getr(beg..end, u32::MAX).await?;
let val = Arc::new(val.convert());
self.cache.set(key, Entry::Lvs(val.clone()));
Ok(val)
}
}
}
/// Retrieve a specific namespace definition.
pub async fn get_ns(&mut self, ns: &str) -> Result<DefineNamespaceStatement, Error> {
@ -785,6 +1003,137 @@ impl Transaction {
Ok(v) => Ok(v),
}
}
/// Retrieve and cache a specific namespace definition.
pub async fn get_and_cache_ns(
&mut self,
ns: &str,
) -> Result<Arc<DefineNamespaceStatement>, Error> {
let key = crate::key::ns::new(ns).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Ns(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::NsNotFound)?;
let val: Arc<DefineNamespaceStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Ns(val.clone()));
Ok(val)
}
}
}
/// Retrieve and cache a specific database definition.
pub async fn get_and_cache_db(
&mut self,
ns: &str,
db: &str,
) -> Result<Arc<DefineDatabaseStatement>, Error> {
let key = crate::key::db::new(ns, db).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Db(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::DbNotFound)?;
let val: Arc<DefineDatabaseStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Db(val.clone()));
Ok(val)
}
}
}
/// Retrieve and cache a specific table definition.
pub async fn get_and_cache_tb(
&mut self,
ns: &str,
db: &str,
tb: &str,
) -> Result<Arc<DefineTableStatement>, Error> {
let key = crate::key::tb::new(ns, db, tb).encode()?;
match self.cache.exi(&key) {
true => match self.cache.get(&key) {
Some(Entry::Tb(v)) => Ok(v),
_ => unreachable!(),
},
_ => {
let val = self.get(key.clone()).await?.ok_or(Error::TbNotFound)?;
let val: Arc<DefineTableStatement> = Arc::new(val.into());
self.cache.set(key, Entry::Tb(val.clone()));
Ok(val)
}
}
}
/// Add a namespace with a default configuration, only if we are in dynamic mode.
pub async fn add_and_cache_ns(
&mut self,
ns: &str,
strict: bool,
) -> Result<Arc<DefineNamespaceStatement>, Error> {
match self.get_and_cache_ns(ns).await {
Err(Error::NsNotFound) => match strict {
false => {
let key = crate::key::ns::new(ns);
let val = DefineNamespaceStatement {
name: ns.to_owned().into(),
};
self.put(key, &val).await?;
Ok(Arc::new(val))
}
true => Err(Error::NsNotFound),
},
Err(e) => Err(e),
Ok(v) => Ok(v),
}
}
/// Add a database with a default configuration, only if we are in dynamic mode.
pub async fn add_and_cache_db(
&mut self,
ns: &str,
db: &str,
strict: bool,
) -> Result<Arc<DefineDatabaseStatement>, Error> {
match self.get_and_cache_db(ns, db).await {
Err(Error::DbNotFound) => match strict {
false => {
let key = crate::key::db::new(ns, db);
let val = DefineDatabaseStatement {
name: db.to_owned().into(),
};
self.put(key, &val).await?;
Ok(Arc::new(val))
}
true => Err(Error::DbNotFound),
},
Err(e) => Err(e),
Ok(v) => Ok(v),
}
}
/// Add a table with a default configuration, only if we are in dynamic mode.
pub async fn add_and_cache_tb(
&mut self,
ns: &str,
db: &str,
tb: &str,
strict: bool,
) -> Result<Arc<DefineTableStatement>, Error> {
match self.get_and_cache_tb(ns, db, tb).await {
Err(Error::TbNotFound) => match strict {
false => {
let key = crate::key::tb::new(ns, db, tb);
let val = DefineTableStatement {
name: tb.to_owned().into(),
permissions: Permissions::none(),
..DefineTableStatement::default()
};
self.put(key, &val).await?;
Ok(Arc::new(val))
}
true => Err(Error::TbNotFound),
},
Err(e) => Err(e),
Ok(v) => Ok(v),
}
}
/// Writes the full database contents as binary SQL.
pub async fn export(&mut self, ns: &str, db: &str, chn: Sender<Vec<u8>>) -> Result<(), Error> {
// Output OPTIONS
@ -804,7 +1153,7 @@ impl Transaction {
chn.send(bytes!("-- LOGINS")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for dl in dls {
for dl in dls.iter() {
chn.send(bytes!(format!("{};", dl))).await?;
}
chn.send(bytes!("")).await?;
@ -818,7 +1167,7 @@ impl Transaction {
chn.send(bytes!("-- TOKENS")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for dt in dts {
for dt in dts.iter() {
chn.send(bytes!(format!("{};", dt))).await?;
}
chn.send(bytes!("")).await?;
@ -832,7 +1181,7 @@ impl Transaction {
chn.send(bytes!("-- SCOPES")).await?;
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!("")).await?;
for sc in scs {
for sc in scs.iter() {
chn.send(bytes!(format!("{};", sc))).await?;
}
chn.send(bytes!("")).await?;
@ -842,7 +1191,7 @@ impl Transaction {
{
let tbs = self.all_tb(ns, db).await?;
if !tbs.is_empty() {
for tb in &tbs {
for tb in tbs.iter() {
// Output TABLE
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE: {}", tb.name))).await?;
@ -854,7 +1203,7 @@ impl Transaction {
{
let fds = self.all_fd(ns, db, &tb.name).await?;
if !fds.is_empty() {
for fd in &fds {
for fd in fds.iter() {
chn.send(bytes!(format!("{};", fd))).await?;
}
chn.send(bytes!("")).await?;
@ -863,7 +1212,7 @@ impl Transaction {
// Output INDEXS
let ixs = self.all_ix(ns, db, &tb.name).await?;
if !ixs.is_empty() {
for ix in &ixs {
for ix in ixs.iter() {
chn.send(bytes!(format!("{};", ix))).await?;
}
chn.send(bytes!("")).await?;
@ -871,7 +1220,7 @@ impl Transaction {
// Output EVENTS
let evs = self.all_ev(ns, db, &tb.name).await?;
if !evs.is_empty() {
for ev in &evs {
for ev in evs.iter() {
chn.send(bytes!(format!("{};", ev))).await?;
}
chn.send(bytes!("")).await?;
@ -885,7 +1234,8 @@ impl Transaction {
chn.send(bytes!("BEGIN TRANSACTION;")).await?;
chn.send(bytes!("")).await?;
// Output TABLE data
for tb in &tbs {
for tb in tbs.iter() {
// Start records
chn.send(bytes!("-- ------------------------------")).await?;
chn.send(bytes!(format!("-- TABLE DATA: {}", tb.name))).await?;
chn.send(bytes!("-- ------------------------------")).await?;

View file

@ -46,7 +46,7 @@ impl InfoStatement {
let mut res = Object::default();
// Process the statement
let mut tmp = Object::default();
for v in run.all_ns().await? {
for v in run.all_ns().await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("ns".to_owned(), tmp.into());
@ -66,19 +66,19 @@ impl InfoStatement {
let mut res = Object::default();
// Process the databases
let mut tmp = Object::default();
for v in run.all_db(opt.ns()).await? {
for v in run.all_db(opt.ns()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("db".to_owned(), tmp.into());
// Process the tokens
let mut tmp = Object::default();
for v in run.all_nt(opt.ns()).await? {
for v in run.all_nt(opt.ns()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("nt".to_owned(), tmp.into());
// Process the logins
let mut tmp = Object::default();
for v in run.all_nl(opt.ns()).await? {
for v in run.all_nl(opt.ns()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("nl".to_owned(), tmp.into());
@ -98,25 +98,25 @@ impl InfoStatement {
let mut res = Object::default();
// Process the tables
let mut tmp = Object::default();
for v in run.all_tb(opt.ns(), opt.db()).await? {
for v in run.all_tb(opt.ns(), opt.db()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("tb".to_owned(), tmp.into());
// Process the scopes
let mut tmp = Object::default();
for v in run.all_sc(opt.ns(), opt.db()).await? {
for v in run.all_sc(opt.ns(), opt.db()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("sc".to_owned(), tmp.into());
// Process the tokens
let mut tmp = Object::default();
for v in run.all_dt(opt.ns(), opt.db()).await? {
for v in run.all_dt(opt.ns(), opt.db()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("dt".to_owned(), tmp.into());
// Process the logins
let mut tmp = Object::default();
for v in run.all_dl(opt.ns(), opt.db()).await? {
for v in run.all_dl(opt.ns(), opt.db()).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("dl".to_owned(), tmp.into());
@ -136,7 +136,7 @@ impl InfoStatement {
let mut res = Object::default();
// Process the tokens
let mut tmp = Object::default();
for v in run.all_st(opt.ns(), opt.db(), sc).await? {
for v in run.all_st(opt.ns(), opt.db(), sc).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("st".to_owned(), tmp.into());
@ -156,25 +156,25 @@ impl InfoStatement {
let mut res = Object::default();
// Process the events
let mut tmp = Object::default();
for v in run.all_ev(opt.ns(), opt.db(), tb).await? {
for v in run.all_ev(opt.ns(), opt.db(), tb).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("ev".to_owned(), tmp.into());
// Process the fields
let mut tmp = Object::default();
for v in run.all_fd(opt.ns(), opt.db(), tb).await? {
for v in run.all_fd(opt.ns(), opt.db(), tb).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("fd".to_owned(), tmp.into());
// Process the indexs
let mut tmp = Object::default();
for v in run.all_ix(opt.ns(), opt.db(), tb).await? {
for v in run.all_ix(opt.ns(), opt.db(), tb).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("ix".to_owned(), tmp.into());
// Process the tables
let mut tmp = Object::default();
for v in run.all_ft(opt.ns(), opt.db(), tb).await? {
for v in run.all_ft(opt.ns(), opt.db(), tb).await?.iter() {
tmp.insert(v.name.to_string(), v.to_string().into());
}
res.insert("ft".to_owned(), tmp.into());