Info for db and table with versioned tables and fields (#4692)

This commit is contained in:
Sergii Glushchenko 2024-09-10 14:59:31 +02:00 committed by GitHub
parent 29af8f573d
commit 516bb977ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 219 additions and 93 deletions

View file

@ -34,7 +34,7 @@ pub async fn gc_ns(tx: &Transaction, ts: u64, ns: &str) -> Result<(), Error> {
#[cfg(debug_assertions)]
trace!("Performing garbage collection on {ns}:{db} for timestamp {ts}");
// Fetch all tables
let tbs = tx.all_tb(ns, &db.name).await?;
let tbs = tx.all_tb(ns, &db.name, None).await?;
// Get the database changefeed expiration
let db_cf_expiry = db.changefeed.map(|v| v.expiry.as_secs()).unwrap_or_default();
// Get the maximum table changefeed expiration

View file

@ -254,7 +254,7 @@ impl Document {
// Get the record id
let id = self.id.as_ref().unwrap();
// Get the field definitions
ctx.tx().all_tb_fields(opt.ns()?, opt.db()?, &id.tb).await
ctx.tx().all_tb_fields(opt.ns()?, opt.db()?, &id.tb, None).await
}
/// Get the indexes for this document
pub async fn ix(

View file

@ -80,7 +80,7 @@ pub async fn generate_schema(
let tx = kvs.transaction(TransactionType::Read, LockType::Optimistic).await?;
let ns = session.ns.as_ref().ok_or(GqlError::UnpecifiedNamespace)?;
let db = session.db.as_ref().ok_or(GqlError::UnpecifiedDatabase)?;
let tbs = tx.all_tb(ns, db).await?;
let tbs = tx.all_tb(ns, db, None).await?;
let mut query = Object::new("Query");
let mut types: Vec<Type> = Vec::new();
@ -114,7 +114,7 @@ pub async fn generate_schema(
types.push(Type::InputObject(filter_id()));
let sess1 = session.to_owned();
let fds = tx.all_tb_fields(ns, db, &tb.name.0).await?;
let fds = tx.all_tb_fields(ns, db, &tb.name.0, None).await?;
let fds1 = fds.clone();
let kvs1 = datastore.clone();

View file

@ -594,7 +594,7 @@ struct SchemaCache {
impl SchemaCache {
async fn new(opt: &Options, table: &Table, tx: &Transaction) -> Result<Self, Error> {
let indexes = tx.all_tb_indexes(opt.ns()?, opt.db()?, table).await?;
let fields = tx.all_tb_fields(opt.ns()?, opt.db()?, table).await?;
let fields = tx.all_tb_fields(opt.ns()?, opt.db()?, table, None).await?;
Ok(Self {
indexes,
fields,

View file

@ -311,7 +311,7 @@ impl IndexStores {
ns: &str,
db: &str,
) -> Result<(), Error> {
for tb in tx.all_tb(ns, db).await?.iter() {
for tb in tx.all_tb(ns, db, None).await?.iter() {
self.table_removed(tx, ns, db, &tb.name).await?;
}
Ok(())

View file

@ -146,14 +146,18 @@ pub trait Transaction {
// Continue with function logic
let beg: Key = key.into();
let end: Key = beg.clone().add(0xff);
self.getr(beg..end).await
self.getr(beg..end, None).await
}
/// Retrieve a range of keys from the datastore.
///
/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn getr<K>(&mut self, rng: Range<K>) -> Result<Vec<(Key, Val)>, Error>
async fn getr<K>(
&mut self,
rng: Range<K>,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
@ -167,7 +171,7 @@ pub trait Transaction {
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch(rng, *NORMAL_FETCH_SIZE, true).await?;
let res = self.batch(rng, *NORMAL_FETCH_SIZE, true, version).await?;
next = res.next;
for v in res.values.into_iter() {
out.push(v);
@ -219,7 +223,7 @@ pub trait Transaction {
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch(rng, *NORMAL_FETCH_SIZE, false).await?;
let res = self.batch(rng, *NORMAL_FETCH_SIZE, false, None).await?;
next = res.next;
for (k, _) in res.values.into_iter() {
self.del(k).await?;
@ -232,7 +236,13 @@ pub trait Transaction {
///
/// This function fetches keys or key-value pairs, in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch<K>(&mut self, rng: Range<K>, batch: u32, values: bool) -> Result<Batch, Error>
async fn batch<K>(
&mut self,
rng: Range<K>,
batch: u32,
values: bool,
version: Option<u64>,
) -> Result<Batch, Error>
where
K: Into<Key> + Sprintable + Debug,
{
@ -245,7 +255,7 @@ pub trait Transaction {
let end: Key = rng.end.into();
// Scan for the next batch
let res = if values {
self.scan(beg..end.clone(), batch, None).await?
self.scan(beg..end.clone(), batch, version).await?
} else {
self.keys(beg..end.clone(), batch)
.await?

View file

@ -91,7 +91,7 @@ impl Transaction {
}
// Output TABLES
{
let tbs = self.all_tb(ns, db).await?;
let tbs = self.all_tb(ns, db, None).await?;
if !tbs.is_empty() {
for tb in tbs.iter() {
// Output TABLE
@ -102,7 +102,7 @@ impl Transaction {
chn.send(bytes!(format!("{tb};"))).await?;
chn.send(bytes!("")).await?;
// Output FIELDS
let fds = self.all_tb_fields(ns, db, &tb.name).await?;
let fds = self.all_tb_fields(ns, db, &tb.name, None).await?;
if !fds.is_empty() {
for fd in fds.iter() {
chn.send(bytes!(format!("{fd};"))).await?;
@ -144,7 +144,7 @@ impl Transaction {
let mut next = Some(beg..end);
while let Some(rng) = next {
// Get the next batch of records
let batch = self.batch(rng, *EXPORT_BATCH_SIZE, true).await?;
let batch = self.batch(rng, *EXPORT_BATCH_SIZE, true, None).await?;
// Set the next scan range
next = batch.next;
// Check there are records

View file

@ -308,7 +308,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// FDB does not support verisoned queries.
// FDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -527,7 +527,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// FDB does not support verisoned queries.
// FDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}

View file

@ -303,7 +303,7 @@ impl Building {
while let Some(rng) = next {
// Get the next batch of records
let tx = self.new_read_tx().await?;
let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true).await);
let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true, None).await);
// We can release the read transaction
drop(tx);
// Set the next scan range

View file

@ -161,7 +161,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// IndexDB does not support verisoned queries.
// IndexDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -183,7 +183,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// IndexDB does not support verisoned queries.
// IndexDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -209,7 +209,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// IndexDB does not support verisoned queries.
// IndexDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -322,7 +322,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// IndexDB does not support verisoned queries.
// IndexDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}

View file

@ -158,7 +158,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// MemDB does not support verisoned queries.
// MemDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -218,7 +218,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// MemDB does not support verisoned queries.
// MemDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}

View file

@ -137,7 +137,7 @@ impl Datastore {
let end = crate::key::node::lq::suffix(*id);
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true).await);
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await);
next = res.next;
for (k, v) in res.values.iter() {
// Decode the data for this live query
@ -220,7 +220,7 @@ impl Datastore {
// Fetch all tables
let tbs = {
let txn = self.transaction(Read, Optimistic).await?;
catch!(txn, txn.all_tb(&ns.name, &db.name).await)
catch!(txn, txn.all_tb(&ns.name, &db.name, None).await)
};
// Loop over all tables
for tb in tbs.iter() {
@ -232,7 +232,7 @@ impl Datastore {
let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name);
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true).await);
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await);
next = res.next;
for (k, v) in res.values.iter() {
// Decode the LIVE query statement

View file

@ -238,7 +238,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// RocksDB does not support verisoned queries.
// RocksDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -286,7 +286,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// RocksDB does not support verisoned queries.
// RocksDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}
@ -454,7 +454,7 @@ impl super::api::Transaction for Transaction {
where
K: Into<Key> + Sprintable + Debug,
{
// RocksDB does not support verisoned queries.
// RocksDB does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}

View file

@ -297,7 +297,7 @@ async fn batch() {
tx.commit().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let res = tx.batch("test1".as_bytes().."test9".as_bytes(), u32::MAX, true).await.unwrap();
let res = tx.batch("test1".as_bytes().."test9".as_bytes(), u32::MAX, true, None).await.unwrap();
let val = res.values;
assert_eq!(val.len(), 5);
assert_eq!(val[0].0, b"test1");
@ -313,7 +313,7 @@ async fn batch() {
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true).await.unwrap();
let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true, None).await.unwrap();
let val = res.values;
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test2");
@ -323,7 +323,7 @@ async fn batch() {
tx.cancel().await.unwrap();
// Create a readonly transaction
let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner();
let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true).await.unwrap();
let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true, None).await.unwrap();
let val = res.values;
assert_eq!(val.len(), 2);
assert_eq!(val[0].0, b"test2");

View file

@ -256,7 +256,7 @@ impl super::api::Transaction for Transaction {
K: Into<Key> + Sprintable + Debug,
V: Into<Val> + Debug,
{
// TiKV does not support verisoned queries.
// TiKV does not support versioned queries.
if version.is_some() {
return Err(Error::UnsupportedVersionedQueries);
}

View file

@ -237,13 +237,17 @@ impl Transactor {
///
/// This function fetches all matching key-value pairs from the underlying datastore in grouped batches.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)]
pub async fn getr<K>(&mut self, rng: Range<K>) -> Result<Vec<(Key, Val)>, Error>
pub async fn getr<K>(
&mut self,
rng: Range<K>,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Debug,
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
expand_inner!(&mut self.inner, v => { v.getr(beg..end).await })
expand_inner!(&mut self.inner, v => { v.getr(beg..end, version).await })
}
/// Retrieve a specific prefixed range of keys from the datastore.
@ -380,13 +384,14 @@ impl Transactor {
rng: Range<K>,
batch: u32,
values: bool,
version: Option<u64>,
) -> Result<Batch, Error>
where
K: Into<Key> + Debug,
{
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
expand_inner!(&mut self.inner, v => { v.batch(beg..end, batch, values).await })
expand_inner!(&mut self.inner, v => { v.batch(beg..end, batch, values, version).await })
}
/// Obtain a new change timestamp for a key
@ -596,7 +601,7 @@ impl Transactor {
let ts_key = crate::key::database::ts::new(ns, db, ts);
let begin = ts_key.encode()?;
let end = crate::key::database::ts::suffix(ns, db);
let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end).await?;
let ts_pairs: Vec<(Vec<u8>, Vec<u8>)> = self.getr(begin..end, None).await?;
let latest_ts_pair = ts_pairs.last();
if let Some((k, _)) = latest_ts_pair {
#[cfg(debug_assertions)]
@ -629,7 +634,7 @@ impl Transactor {
let start = crate::key::database::ts::prefix(ns, db);
let ts_key = crate::key::database::ts::new(ns, db, ts + 1);
let end = ts_key.encode()?;
let ts_pairs = self.getr(start..end).await?;
let ts_pairs = self.getr(start..end, None).await?;
let latest_ts_pair = ts_pairs.last();
if let Some((_, v)) = latest_ts_pair {
if v.len() == 10 {

View file

@ -141,11 +141,15 @@ impl Transaction {
///
/// This function fetches key-value pairs from the underlying datastore in grouped batches.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn getr<K>(&self, rng: Range<K>) -> Result<Vec<(Key, Val)>, Error>
pub async fn getr<K>(
&self,
rng: Range<K>,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: Into<Key> + Debug,
{
self.lock().await.getr(rng).await
self.lock().await.getr(rng, version).await
}
/// Delete a key from the datastore.
@ -250,11 +254,17 @@ impl Transaction {
///
/// This function fetches the key-value pairs in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
pub async fn batch<K>(&self, rng: Range<K>, batch: u32, values: bool) -> Result<Batch, Error>
pub async fn batch<K>(
&self,
rng: Range<K>,
batch: u32,
values: bool,
version: Option<u64>,
) -> Result<Batch, Error>
where
K: Into<Key> + Debug,
{
self.lock().await.batch(rng, batch, values).await
self.lock().await.batch(rng, batch, values, version).await
}
/// Retrieve a stream over a specific range of keys in the datastore.
@ -315,7 +325,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::root::nd::suffix();
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Nds(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -334,7 +344,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::root::us::suffix();
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Rus(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -353,7 +363,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::root::ac::suffix();
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Ras(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -372,7 +382,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::root::access::gr::suffix(ra);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Rag(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -391,7 +401,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::root::ns::suffix();
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Nss(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -410,7 +420,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::namespace::us::suffix(ns);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Nus(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -429,7 +439,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::namespace::ac::suffix(ns);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Nas(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -452,7 +462,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::namespace::access::gr::suffix(ns, na);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Nag(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -471,7 +481,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::namespace::db::suffix(ns);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Dbs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -494,7 +504,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::us::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Dus(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -517,7 +527,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::ac::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Das(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -541,7 +551,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::access::gr::suffix(ns, db, da);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Dag(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -564,7 +574,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::az::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Azs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -587,7 +597,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::fc::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Fcs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -610,7 +620,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::pa::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Pas(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -633,7 +643,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::ml::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Mls(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -645,14 +655,19 @@ impl Transaction {
/// Retrieve all table definitions for a specific database.
#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
pub async fn all_tb(&self, ns: &str, db: &str) -> Result<Arc<[DefineTableStatement]>, Error> {
pub async fn all_tb(
&self,
ns: &str,
db: &str,
version: Option<u64>,
) -> Result<Arc<[DefineTableStatement]>, Error> {
let key = crate::key::database::tb::prefix(ns, db);
let res = self.cache.get_value_or_guard_async(&key).await;
match res {
Ok(val) => val,
Err(cache) => {
let end = crate::key::database::tb::suffix(ns, db);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, version).await?;
let val = val.convert().into();
let val = Entry::Tbs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -676,7 +691,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::table::ev::suffix(ns, db, tb);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Evs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -693,6 +708,7 @@ impl Transaction {
ns: &str,
db: &str,
tb: &str,
version: Option<u64>,
) -> Result<Arc<[DefineFieldStatement]>, Error> {
let key = crate::key::table::fd::prefix(ns, db, tb);
let res = self.cache.get_value_or_guard_async(&key).await;
@ -700,7 +716,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::table::fd::suffix(ns, db, tb);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, version).await?;
let val = val.convert().into();
let val = Entry::Fds(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -724,7 +740,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::table::ix::suffix(ns, db, tb);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Ixs(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -748,7 +764,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::table::ft::suffix(ns, db, tb);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Fts(Arc::clone(&val));
let _ = cache.insert(val.clone());
@ -772,7 +788,7 @@ impl Transaction {
Ok(val) => val,
Err(cache) => {
let end = crate::key::table::lq::suffix(ns, db, tb);
let val = self.getr(key..end).await?;
let val = self.getr(key..end, None).await?;
let val = val.convert().into();
let val = Entry::Lvs(Arc::clone(&val));
let _ = cache.insert(val.clone());

View file

@ -9,7 +9,7 @@ pub async fn v1_to_2_id_uuid(tx: Arc<Transaction>) -> Result<(), Error> {
let ns = ns.name.as_str();
for db in tx.all_db(ns).await?.iter() {
let db = db.name.as_str();
for tb in tx.all_tb(ns, db).await?.iter() {
for tb in tx.all_tb(ns, db, None).await?.iter() {
let tb = tb.name.as_str();
migrate_tb_records(tx.clone(), ns, db, tb).await?;
migrate_tb_edges(tx.clone(), ns, db, tb).await?;

View file

@ -81,7 +81,7 @@ impl DefineFieldStatement {
.await?;
// find existing field definitions.
let fields = txn.all_tb_fields(ns, db, &self.what).await.ok();
let fields = txn.all_tb_fields(ns, db, &self.what, None).await.ok();
// Process possible recursive_definitions.
if let Some(mut cur_kind) = self.kind.as_ref().and_then(|x| x.inner_kind()) {

View file

@ -4,14 +4,14 @@ use crate::doc::CursorDoc;
use crate::err::Error;
use crate::iam::Action;
use crate::iam::ResourceKind;
use crate::sql::{Base, Ident, Object, Value};
use crate::sql::{Base, Ident, Object, Value, Version};
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
#[revisioned(revision = 4)]
#[revisioned(revision = 5)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[non_exhaustive]
@ -25,10 +25,10 @@ pub enum InfoStatement {
Ns(#[revision(start = 2)] bool),
#[revision(override(revision = 2, discriminant = 5), override(revision = 3, discriminant = 5))]
Db(#[revision(start = 2)] bool),
Db(#[revision(start = 2)] bool, #[revision(start = 5)] Option<Version>),
#[revision(override(revision = 2, discriminant = 7), override(revision = 3, discriminant = 7))]
Tb(Ident, #[revision(start = 2)] bool),
Tb(Ident, #[revision(start = 2)] bool, #[revision(start = 5)] Option<Version>),
#[revision(override(revision = 2, discriminant = 9), override(revision = 3, discriminant = 9))]
User(Ident, Option<Base>, #[revision(start = 2)] bool),
@ -131,12 +131,14 @@ impl InfoStatement {
}),
})
}
InfoStatement::Db(structured) => {
InfoStatement::Db(structured, version) => {
// Allowed to run?
opt.is_allowed(Action::View, ResourceKind::Any, &Base::Db)?;
// Get the NS and DB
let ns = opt.ns()?;
let db = opt.db()?;
// Convert the version to u64 if present
let version = version.as_ref().map(|v| v.to_u64());
// Get the transaction
let txn = ctx.tx();
// Create the result set
@ -147,7 +149,7 @@ impl InfoStatement {
"functions".to_string() => process(txn.all_db_functions(ns, db).await?),
"models".to_string() => process(txn.all_db_models(ns, db).await?),
"params".to_string() => process(txn.all_db_params(ns, db).await?),
"tables".to_string() => process(txn.all_tb(ns, db).await?),
"tables".to_string() => process(txn.all_tb(ns, db, version).await?),
"users".to_string() => process(txn.all_db_users(ns, db).await?),
}),
false => Value::from(map! {
@ -188,7 +190,7 @@ impl InfoStatement {
},
"tables".to_string() => {
let mut out = Object::default();
for v in txn.all_tb(ns, db).await?.iter() {
for v in txn.all_tb(ns, db, version).await?.iter() {
out.insert(v.name.to_raw(), v.to_string().into());
}
out.into()
@ -203,19 +205,21 @@ impl InfoStatement {
}),
})
}
InfoStatement::Tb(tb, structured) => {
InfoStatement::Tb(tb, structured, version) => {
// Allowed to run?
opt.is_allowed(Action::View, ResourceKind::Any, &Base::Db)?;
// Get the NS and DB
let ns = opt.ns()?;
let db = opt.db()?;
// Convert the version to u64 if present
let version = version.as_ref().map(|v| v.to_u64());
// Get the transaction
let txn = ctx.tx();
// Create the result set
Ok(match structured {
true => Value::from(map! {
"events".to_string() => process(txn.all_tb_events(ns, db, tb).await?),
"fields".to_string() => process(txn.all_tb_fields(ns, db, tb).await?),
"fields".to_string() => process(txn.all_tb_fields(ns, db, tb, version).await?),
"indexes".to_string() => process(txn.all_tb_indexes(ns, db, tb).await?),
"lives".to_string() => process(txn.all_tb_lives(ns, db, tb).await?),
"tables".to_string() => process(txn.all_tb_views(ns, db, tb).await?),
@ -230,7 +234,7 @@ impl InfoStatement {
},
"fields".to_string() => {
let mut out = Object::default();
for v in txn.all_tb_fields(ns, db, tb).await?.iter() {
for v in txn.all_tb_fields(ns, db, tb, version).await?.iter() {
out.insert(v.name.to_string(), v.to_string().into());
}
out.into()
@ -308,10 +312,23 @@ impl fmt::Display for InfoStatement {
Self::Root(true) => f.write_str("INFO FOR ROOT STRUCTURE"),
Self::Ns(false) => f.write_str("INFO FOR NAMESPACE"),
Self::Ns(true) => f.write_str("INFO FOR NAMESPACE STRUCTURE"),
Self::Db(false) => f.write_str("INFO FOR DATABASE"),
Self::Db(true) => f.write_str("INFO FOR DATABASE STRUCTURE"),
Self::Tb(ref t, false) => write!(f, "INFO FOR TABLE {t}"),
Self::Tb(ref t, true) => write!(f, "INFO FOR TABLE {t} STRUCTURE"),
Self::Db(false, ref v) => match v {
Some(ref v) => write!(f, "INFO FOR DATABASE VERSION {v}"),
None => f.write_str("INFO FOR DATABASE"),
},
Self::Db(true, ref v) => match v {
Some(ref v) => write!(f, "INFO FOR DATABASE VERSION {v} STRUCTURE"),
None => f.write_str("INFO FOR DATABASE STRUCTURE"),
},
Self::Tb(ref t, false, ref v) => match v {
Some(ref v) => write!(f, "INFO FOR TABLE {t} VERSION {v}"),
None => write!(f, "INFO FOR TABLE {t}"),
},
Self::Tb(ref t, true, ref v) => match v {
Some(ref v) => write!(f, "INFO FOR TABLE {t} VERSION {v} STRUCTURE"),
None => write!(f, "INFO FOR TABLE {t} STRUCTURE"),
},
Self::User(ref u, ref b, false) => match b {
Some(ref b) => write!(f, "INFO FOR USER {u} ON {b}"),
None => write!(f, "INFO FOR USER {u}"),
@ -335,12 +352,20 @@ impl InfoStatement {
match self {
InfoStatement::Root(_) => InfoStatement::Root(true),
InfoStatement::Ns(_) => InfoStatement::Ns(true),
InfoStatement::Db(_) => InfoStatement::Db(true),
InfoStatement::Tb(t, _) => InfoStatement::Tb(t, true),
InfoStatement::Db(_, v) => InfoStatement::Db(true, v),
InfoStatement::Tb(t, _, v) => InfoStatement::Tb(t, true, v),
InfoStatement::User(u, b, _) => InfoStatement::User(u, b, true),
InfoStatement::Index(i, t, _) => InfoStatement::Index(i, t, true),
}
}
pub(crate) fn versionize(self, v: Version) -> Self {
match self {
InfoStatement::Db(s, _) => InfoStatement::Db(s, Some(v)),
InfoStatement::Tb(t, s, _) => InfoStatement::Tb(t, s, Some(v)),
_ => self,
}
}
}
fn process<T>(a: Arc<[T]>) -> Value

View file

@ -180,7 +180,7 @@ mod tests {
// Create a new transaction and verify that there are no tables defined.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert!(table_occurrences.is_empty());
tx.cancel().await.unwrap();
@ -196,7 +196,7 @@ mod tests {
// Verify that the table definition has been created.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
@ -218,7 +218,7 @@ mod tests {
// Create a new transaction to verify that the same table was used.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
@ -250,7 +250,7 @@ mod tests {
// Create a new transaction and verify that there are no tables defined.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert!(table_occurrences.is_empty());
tx.cancel().await.unwrap();
@ -260,7 +260,7 @@ mod tests {
// Create a new transaction and confirm that a new table is created.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();
@ -271,7 +271,7 @@ mod tests {
// Verify that the old table definition was used.
let tx = dbs.transaction(Write, Optimistic).await.unwrap();
let table_occurrences = &*(tx.all_tb(ns, db).await.unwrap());
let table_occurrences = &*(tx.all_tb(ns, db, None).await.unwrap());
assert_eq!(table_occurrences.len(), 1);
assert_eq!(table_occurrences[0].name.0, tb);
tx.cancel().await.unwrap();

View file

@ -526,10 +526,10 @@ impl Parser<'_> {
let mut stmt = match next.kind {
t!("ROOT") => InfoStatement::Root(false),
t!("NAMESPACE") | t!("ns") => InfoStatement::Ns(false),
t!("DATABASE") => InfoStatement::Db(false),
t!("DATABASE") => InfoStatement::Db(false, None),
t!("TABLE") => {
let ident = self.next_token_value()?;
InfoStatement::Tb(ident, false)
InfoStatement::Tb(ident, false, None)
}
t!("USER") => {
let ident = self.next_token_value()?;
@ -546,6 +546,10 @@ impl Parser<'_> {
_ => unexpected!(self, next, "an info target"),
};
if let Some(version) = self.try_parse_version()? {
stmt = stmt.versionize(version);
}
if self.peek_kind() == t!("STRUCTURE") {
self.pop_peek();
stmt = stmt.structurize();

View file

@ -1766,7 +1766,7 @@ fn parse_info() {
assert_eq!(res, Statement::Info(InfoStatement::Ns(false)));
let res = test_parse!(parse_stmt, "INFO FOR TABLE table").unwrap();
assert_eq!(res, Statement::Info(InfoStatement::Tb(Ident("table".to_owned()), false)));
assert_eq!(res, Statement::Info(InfoStatement::Tb(Ident("table".to_owned()), false, None)));
let res = test_parse!(parse_stmt, "INFO FOR USER user").unwrap();
assert_eq!(res, Statement::Info(InfoStatement::User(Ident("user".to_owned()), None, false)));

View file

@ -581,6 +581,72 @@ mod api_integration {
assert!(response.is_none());
}
#[test_log::test(tokio::test)]
async fn info_for_db_with_versioned_tables() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
// Record the timestamp before creating a testing table.
let ts_before_create = chrono::Utc::now().to_rfc3339();
// Create the testing table.
let _ = db.query("DEFINE TABLE person").await.unwrap().check().unwrap();
// Record the timestamp after creating the testing table.
let ts_after_create = chrono::Utc::now().to_rfc3339();
// Check that historical query shows no table before it was created.
let q = format!("INFO FOR DB VERSION d'{}'", ts_before_create);
let mut response = db.query(q).await.unwrap().check().unwrap();
let info = response.take::<Value>(0).unwrap().to_string();
assert!(info.contains("tables: { }"));
// Now check that the table shows up later.
let q = format!("INFO FOR DB VERSION d'{}'", ts_after_create);
let mut response = db.query(q).await.unwrap().check().unwrap();
let info = response.take::<Value>(0).unwrap().to_string();
assert!(info.contains(
"tables: { person: 'DEFINE TABLE person TYPE ANY SCHEMALESS PERMISSIONS NONE' }"
));
}
#[test_log::test(tokio::test)]
async fn info_for_table_with_versioned_fields() {
let (permit, db) = new_db().await;
db.use_ns(NS).use_db(Ulid::new().to_string()).await.unwrap();
drop(permit);
// Create the testing table.
let _ = db.query("DEFINE TABLE person").await.unwrap().check().unwrap();
// Record the timestamp before creating a field.
let ts_before_field = chrono::Utc::now().to_rfc3339();
let _ = db
.query("DEFINE FIELD firstName ON TABLE person TYPE string")
.await
.unwrap()
.check()
.unwrap();
// Record the timestamp after creating the field.
let ts_after_field = chrono::Utc::now().to_rfc3339();
// Check that historical query shows no field before it was created.
let q = format!("INFO FOR TABLE person VERSION d'{}'", ts_before_field);
let mut response = db.query(q).await.unwrap().check().unwrap();
let info = response.take::<Value>(0).unwrap().to_string();
assert!(info.contains("fields: { }"));
// Now check that the field shows up later.
let q = format!("INFO FOR TABLE person VERSION d'{}'", ts_after_field);
let mut response = db.query(q).await.unwrap().check().unwrap();
let info = response.take::<Value>(0).unwrap().to_string();
assert!(info.contains(
"fields: { firstName: 'DEFINE FIELD firstName ON person TYPE string PERMISSIONS FULL' }"
));
}
include!("api/mod.rs");
include!("api/live.rs");
include!("api/backup.rs");