Add ns/db/tb IDs for aliasing (#2441)

This commit is contained in:
Yusuke Kuoka 2023-08-18 17:00:06 +09:00 committed by GitHub
parent a242d21d94
commit 41c9fd701e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 567 additions and 7 deletions

1
lib/src/idg/mod.rs Normal file
View file

@ -0,0 +1 @@
pub(crate) mod u32;

181
lib/src/idg/u32.rs Normal file
View file

@ -0,0 +1,181 @@
use crate::err::Error;
use crate::kvs::{Key, Val};
use roaring::RoaringBitmap;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
// Id is a unique id generated by the generator.
pub(crate) type Id = u32;
// U64 is a generator that generates unique unsigned 64-bit integer ids.
// It can reuse freed ids by keeping track of them in a roaring bitmap.
// This doesn't do any variable-length encoding, so it's not as space efficient as it could be.
// It is used to generate ids for any SurrealDB objects that need aliases (e.g. namespaces, databases, tables, indexes, etc.)
#[derive(Clone)]
pub struct U32 {
state_key: Key,
available_ids: Option<RoaringBitmap>,
next_id: Id,
updated: bool,
}
impl U32 {
pub(crate) async fn new(state_key: Key, v: Option<Val>) -> Result<Self, Error> {
let state: State = if let Some(val) = v {
State::try_from_val(val)?
} else {
State::new()
};
Ok(Self {
state_key,
available_ids: state.available_ids,
updated: false,
next_id: state.next_id,
})
}
pub(crate) fn get_next_id(&mut self) -> Id {
self.updated = true;
// We check first if there is any available id
if let Some(available_ids) = &mut self.available_ids {
if let Some(available_id) = available_ids.iter().next() {
available_ids.remove(available_id);
if available_ids.is_empty() {
self.available_ids = None;
}
return available_id;
}
}
// If not, we use the sequence
let doc_id = self.next_id;
self.next_id += 1;
doc_id
}
pub(crate) fn remove_id(&mut self, id: Id) {
if let Some(available_ids) = &mut self.available_ids {
available_ids.insert(id);
} else {
let mut available_ids = RoaringBitmap::new();
available_ids.insert(id);
self.available_ids = Some(available_ids);
}
self.updated = true;
}
pub(crate) fn finish(&mut self) -> Option<(Key, Val)> {
if self.updated {
let state = State {
available_ids: self.available_ids.take(),
next_id: self.next_id,
};
return Some((self.state_key.clone(), state.try_to_val().unwrap()));
}
None
}
}
#[derive(Serialize, Deserialize)]
struct State {
available_ids: Option<RoaringBitmap>,
next_id: Id,
}
impl State {
fn new() -> Self {
Self {
available_ids: None,
next_id: 0,
}
}
}
pub(crate) trait SerdeState
where
Self: Sized + Serialize + DeserializeOwned,
{
fn try_to_val(&self) -> Result<Val, Error> {
Ok(bincode::serialize(self)?)
}
fn try_from_val(val: Val) -> Result<Self, Error> {
Ok(bincode::deserialize(&val)?)
}
}
impl SerdeState for RoaringBitmap {}
impl SerdeState for State {}
#[cfg(test)]
mod tests {
use crate::err::Error;
use crate::idg::u32::U32;
use crate::kvs::{Datastore, Transaction};
async fn get_ids(ds: &Datastore) -> (Transaction, U32) {
let mut tx = ds.transaction(true, false).await.unwrap();
let key = "foo";
let v = tx.get(key).await.unwrap();
let d = U32::new(key.into(), v).await.unwrap();
(tx, d)
}
async fn finish(mut tx: Transaction, mut d: U32) -> Result<(), Error> {
match d.finish() {
Some((key, val)) => {
tx.set(key, val).await?;
}
None => {}
}
tx.commit().await
}
#[tokio::test]
async fn test_get_remove_ids() {
let ds = Datastore::new("memory").await.unwrap();
// Get the first id
{
let (tx, mut d) = get_ids(&ds).await;
let id = d.get_next_id();
finish(tx, d).await.unwrap();
assert_eq!(id, 0);
}
// Get the second and the third ids
{
let (tx, mut d) = get_ids(&ds).await;
let id1 = d.get_next_id();
let id2 = d.get_next_id();
finish(tx, d).await.unwrap();
assert_eq!(id1, 1);
assert_eq!(id2, 2);
}
// It reuses the removed id within a transaction
{
let (tx, mut d) = get_ids(&ds).await;
d.remove_id(1);
let id1 = d.get_next_id();
let id2 = d.get_next_id();
finish(tx, d).await.unwrap();
assert_eq!(id1, 1);
assert_eq!(id2, 3);
}
// It reuses the removed id across transactions
{
let (tx, mut d1) = get_ids(&ds).await;
d1.remove_id(2);
finish(tx, d1).await.unwrap();
let (tx, mut d2) = get_ids(&ds).await;
let id1 = d2.get_next_id();
let id2 = d2.get_next_id();
finish(tx, d2).await.unwrap();
assert_eq!(id1, 2);
assert_eq!(id2, 4);
}
}
}

View file

@ -5,6 +5,7 @@ pub mod lg;
pub mod pa;
pub mod sc;
pub mod tb;
pub mod ti;
pub mod tk;
pub mod ts;
pub mod us;

View file

@ -0,0 +1,51 @@
//! Stores the next and available freed IDs for documents
use derive::Key;
use serde::{Deserialize, Serialize};
// Table ID generator
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Ti {
__: u8,
_a: u8,
pub ns: u32,
_b: u8,
pub db: u32,
_c: u8,
_d: u8,
_e: u8,
}
pub fn new(ns: u32, db: u32) -> Ti {
Ti::new(ns, db)
}
impl Ti {
pub fn new(ns: u32, db: u32) -> Self {
Ti {
__: b'/',
_a: b'+',
ns,
_b: b'*',
db,
_c: b'!',
_d: b't',
_e: b'i',
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Ti::new(
123u32,
234u32,
);
let enc = Ti::encode(&val).unwrap();
let dec = Ti::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -3,6 +3,7 @@
/// crate::key::root::all /
/// crate::key::root::hb /!hb{ts}/{nd}
/// crate::key::root::nd /!nd{nd}
/// crate::key::root::ni /!ni
/// crate::key::root::ns /!ns{ns}
///
/// crate::key::node::all /${nd}
@ -10,6 +11,7 @@
///
/// crate::key::namespace::all /*{ns}
/// crate::key::namespace::db /*{ns}!db{db}
/// crate::key::namespace::di /+{ns id}!di
/// crate::key::namespace::lg /*{ns}!lg{lg}
/// crate::key::namespace::tk /*{ns}!tk{tk}
///
@ -20,6 +22,7 @@
/// crate::key::database::pa /*{ns}*{db}!pa{pa}
/// crate::key::database::sc /*{ns}*{db}!sc{sc}
/// crate::key::database::tb /*{ns}*{db}!tb{tb}
/// crate::key::database::ti /+{ns id}*{db id}!ti
/// crate::key::database::tk /*{ns}*{db}!tk{tk}
/// crate::key::database::ts /*{ns}*{db}!ts{ts}
/// crate::key::database::vs /*{ns}*{db}!vs

View file

@ -0,0 +1,47 @@
/// Stores a database ID generator state
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Di {
__: u8,
_a: u8,
pub ns: u32,
_b: u8,
_c: u8,
_d: u8,
}
pub fn new(ns: u32) -> Di {
Di::new(ns)
}
impl Di {
pub fn new(ns: u32) -> Self {
Self {
__: b'/',
_a: b'+',
ns,
_b: b'!',
_c: b'd',
_d: b'i',
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
#[rustfmt::skip]
let val = Di::new(
123,
);
let enc = Di::encode(&val).unwrap();
assert_eq!(enc, vec![0x2f, 0x2b, 0, 0, 0, 0x7b, 0x21, 0x64, 0x69]);
let dec = Di::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -1,5 +1,6 @@
pub mod all;
pub mod db;
pub mod di;
pub mod lg;
pub mod tk;
pub mod us;

View file

@ -1,5 +1,6 @@
pub mod all;
pub mod hb;
pub mod nd;
pub mod ni;
pub mod ns;
pub mod us;

40
lib/src/key/root/ni.rs Normal file
View file

@ -0,0 +1,40 @@
//! Stores namespace ID generator state
use derive::Key;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Key)]
pub struct Ni {
__: u8,
_a: u8,
_b: u8,
_c: u8,
}
impl Default for Ni {
fn default() -> Self {
Self::new()
}
}
impl Ni {
pub fn new() -> Self {
Self {
__: b'/',
_a: b'!',
_b: b'n',
_c: b'i',
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn key() {
use super::*;
let val = Ni::new();
let enc = Ni::encode(&val).unwrap();
let dec = Ni::decode(&enc).unwrap();
assert_eq!(val, dec);
}
}

View file

@ -1,3 +1,4 @@
use crate::idg::u32::U32;
use crate::kvs::kv::Key;
use crate::sql::statements::DefineAnalyzerStatement;
use crate::sql::statements::DefineDatabaseStatement;
@ -37,6 +38,7 @@ pub enum Entry {
Nts(Arc<[DefineTokenStatement]>),
Pas(Arc<[DefineParamStatement]>),
Scs(Arc<[DefineScopeStatement]>),
Seq(U32),
Sts(Arc<[DefineTokenStatement]>),
Tbs(Arc<[DefineTableStatement]>),
}

View file

@ -24,6 +24,7 @@ use channel::Receiver;
use channel::Sender;
use futures::lock::Mutex;
use futures::Future;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
@ -643,6 +644,7 @@ impl Datastore {
inner,
cache: super::cache::Cache::default(),
cf: cf::Writer::new(),
write_buffer: HashMap::new(),
vso: self.vso.clone(),
})
}

View file

@ -18,6 +18,7 @@ async fn table_definitions_can_be_scanned() {
name: Default::default(),
drop: false,
full: false,
id: None,
view: None,
permissions: Default::default(),
changefeed: None,
@ -52,6 +53,7 @@ async fn table_definitions_can_be_deleted() {
name: Default::default(),
drop: false,
full: false,
id: None,
view: None,
permissions: Default::default(),
changefeed: None,

View file

@ -6,6 +6,7 @@ use crate::cf;
use crate::dbs::node::ClusterMembership;
use crate::dbs::node::Timestamp;
use crate::err::Error;
use crate::idg::u32::U32;
use crate::kvs::cache::Cache;
use crate::kvs::cache::Entry;
use crate::kvs::Check;
@ -37,6 +38,7 @@ use sql::statements::DefineTableStatement;
use sql::statements::DefineTokenStatement;
use sql::statements::LiveStatement;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::ops::Range;
@ -50,6 +52,7 @@ pub struct Transaction {
pub(super) inner: Inner,
pub(super) cache: Cache,
pub(super) cf: cf::Writer,
pub(super) write_buffer: HashMap<Key, ()>,
pub(super) vso: Arc<Mutex<Oracle>>,
}
@ -1865,6 +1868,7 @@ impl Transaction {
let key = crate::key::root::ns::new(ns);
let val = DefineNamespaceStatement {
name: ns.to_owned().into(),
id: None,
};
self.put(key, &val).await?;
Ok(val)
@ -1894,6 +1898,7 @@ impl Transaction {
let val = DefineDatabaseStatement {
name: db.to_owned().into(),
changefeed: None,
id: None,
};
self.put(key, &val).await?;
Ok(val)
@ -2052,6 +2057,7 @@ impl Transaction {
let key = crate::key::root::ns::new(ns);
let val = DefineNamespaceStatement {
name: ns.to_owned().into(),
id: None,
};
self.put(key, &val).await?;
Ok(Arc::new(val))
@ -2081,6 +2087,7 @@ impl Transaction {
let val = DefineDatabaseStatement {
name: db.to_owned().into(),
changefeed: None,
id: None,
};
self.put(key, &val).await?;
Ok(Arc::new(val))
@ -2394,6 +2401,138 @@ impl Transaction {
}
}
pub(crate) async fn get_idg(&mut self, key: Key) -> Result<U32, Error> {
let seq = if let Some(e) = self.cache.get(&key) {
if let Entry::Seq(v) = e {
v
} else {
unreachable!();
}
} else {
let val = self.get(key.clone()).await?;
if let Some(val) = val {
U32::new(key.clone(), Some(val)).await?
} else {
U32::new(key.clone(), None).await?
}
};
Ok(seq)
}
// get_next_db_id will get the next db id for the given namespace.
pub(crate) async fn get_next_db_id(&mut self, ns: u32) -> Result<u32, Error> {
let key = crate::key::namespace::di::new(ns).encode().unwrap();
let mut seq = if let Some(e) = self.cache.get(&key) {
if let Entry::Seq(v) = e {
v
} else {
unreachable!();
}
} else {
let val = self.get(key.clone()).await?;
if let Some(val) = val {
U32::new(key.clone(), Some(val)).await?
} else {
U32::new(key.clone(), None).await?
}
};
let id = seq.get_next_id();
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(id)
}
// remove_db_id removes the given db id from the sequence.
#[allow(unused)]
pub(crate) async fn remove_db_id(&mut self, ns: u32, db: u32) -> Result<(), Error> {
let key = crate::key::namespace::di::new(ns).encode().unwrap();
let mut seq = self.get_idg(key.clone()).await?;
seq.remove_id(db);
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(())
}
// get_next_db_id will get the next tb id for the given namespace and database.
pub(crate) async fn get_next_tb_id(&mut self, ns: u32, db: u32) -> Result<u32, Error> {
let key = crate::key::database::ti::new(ns, db).encode().unwrap();
let mut seq = self.get_idg(key.clone()).await?;
let id = seq.get_next_id();
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(id)
}
// remove_tb_id removes the given tb id from the sequence.
#[allow(unused)]
pub(crate) async fn remove_tb_id(&mut self, ns: u32, db: u32, tb: u32) -> Result<(), Error> {
let key = crate::key::database::ti::new(ns, db).encode().unwrap();
let mut seq = self.get_idg(key.clone()).await?;
seq.remove_id(tb);
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(())
}
// get_next_ns_id will get the next ns id.
pub(crate) async fn get_next_ns_id(&mut self) -> Result<u32, Error> {
let key = crate::key::root::ni::Ni::default().encode().unwrap();
let mut seq = if let Some(e) = self.cache.get(&key) {
if let Entry::Seq(v) = e {
v
} else {
unreachable!();
}
} else {
let val = self.get(key.clone()).await?;
if let Some(val) = val {
U32::new(key.clone(), Some(val)).await?
} else {
U32::new(key.clone(), None).await?
}
};
let id = seq.get_next_id();
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(id)
}
// remove_ns_id removes the given ns id from the sequence.
#[allow(unused)]
pub(crate) async fn remove_ns_id(&mut self, ns: u32) -> Result<(), Error> {
let key = crate::key::root::ni::Ni::default().encode().unwrap();
let mut seq = self.get_idg(key.clone()).await?;
seq.remove_id(ns);
self.cache.set(key.clone(), Entry::Seq(seq));
self.write_buffer.insert(key.clone(), ());
Ok(())
}
// complete_changes will complete the changefeed recording for the given namespace and database.
//
// Under the hood, this function calls the transaction's `set_versionstamped_key` for each change.
@ -2411,6 +2550,20 @@ impl Transaction {
// Lastly, you should set lock=true if you want the changefeed to be correctly ordered for
// non-FDB backends.
pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> {
let mut buf = self.write_buffer.clone();
let writes = buf.drain();
for (k, _) in writes {
let v = self.cache.get(&k).unwrap();
let mut seq = if let Entry::Seq(v) = v {
v
} else {
unreachable!();
};
if let Some((k, v)) = seq.finish() {
self.set(k, v).await?
}
}
let changes = self.cf.get();
for (tskey, prefix, suffix, v) in changes {
self.set_versionstamped_key(tskey, prefix, suffix, v).await?
@ -2680,4 +2833,48 @@ mod tests {
assert_eq!(res[0], data);
txn.commit().await.unwrap();
}
#[tokio::test]
async fn test_seqs() {
let ds = Datastore::new("memory").await.unwrap();
let mut txn = ds.transaction(true, false).await.unwrap();
let nsid = txn.get_next_ns_id().await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
assert_eq!(nsid, 0);
let mut txn = ds.transaction(true, false).await.unwrap();
let dbid = txn.get_next_db_id(nsid).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
assert_eq!(dbid, 0);
let mut txn = ds.transaction(true, false).await.unwrap();
let tbid1 = txn.get_next_tb_id(nsid, dbid).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
assert_eq!(tbid1, 0);
let mut txn = ds.transaction(true, false).await.unwrap();
let tbid2 = txn.get_next_tb_id(nsid, dbid).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
assert_eq!(tbid2, 1);
let mut txn = ds.transaction(true, false).await.unwrap();
txn.remove_tb_id(nsid, dbid, tbid1).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
let mut txn = ds.transaction(true, false).await.unwrap();
txn.remove_db_id(nsid, dbid).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
let mut txn = ds.transaction(true, false).await.unwrap();
txn.remove_ns_id(nsid).await.unwrap();
txn.complete_changes(false).await.unwrap();
txn.commit().await.unwrap();
}
}

View file

@ -127,6 +127,8 @@ pub mod err;
#[doc(hidden)]
pub mod iam;
#[doc(hidden)]
pub mod idg;
#[doc(hidden)]
pub mod idx;
#[doc(hidden)]
pub mod key;

View file

@ -144,6 +144,7 @@ pub fn define(i: &str) -> IResult<&str, DefineStatement> {
#[revisioned(revision = 1)]
pub struct DefineNamespaceStatement {
pub name: Ident,
pub id: Option<u32>,
}
impl DefineNamespaceStatement {
@ -161,7 +162,14 @@ impl DefineNamespaceStatement {
let key = crate::key::root::ns::new(&self.name);
// Claim transaction
let mut run = txn.lock().await;
run.set(key, self).await?;
// Set the id
if self.id.is_none() {
let mut ns = self.clone();
ns.id = Some(run.get_next_ns_id().await?);
run.set(key, ns).await?;
} else {
run.set(key, self).await?;
}
// Ok all good
Ok(Value::None)
}
@ -183,6 +191,7 @@ fn namespace(i: &str) -> IResult<&str, DefineNamespaceStatement> {
i,
DefineNamespaceStatement {
name,
id: None,
},
))
}
@ -196,6 +205,7 @@ fn namespace(i: &str) -> IResult<&str, DefineNamespaceStatement> {
pub struct DefineDatabaseStatement {
pub name: Ident,
pub changefeed: Option<ChangeFeed>,
pub id: Option<u32>,
}
impl DefineDatabaseStatement {
@ -213,8 +223,17 @@ impl DefineDatabaseStatement {
let mut run = txn.lock().await;
// Process the statement
let key = crate::key::namespace::db::new(opt.ns(), &self.name);
run.add_ns(opt.ns(), opt.strict).await?;
run.set(key, self).await?;
let ns = run.add_ns(opt.ns(), opt.strict).await?;
// Set the id
if self.id.is_none() && ns.id.is_some() {
let mut db = self.clone();
db.id = Some(run.get_next_db_id(ns.id.unwrap()).await?);
// Store the db
run.set(key, db).await?;
} else {
// Store the db
run.set(key, self).await?;
}
// Ok all good
Ok(Value::None)
}
@ -247,6 +266,7 @@ fn database(i: &str) -> IResult<&str, DefineDatabaseStatement> {
DefineDatabaseOption::ChangeFeed(ref v) => v.to_owned(),
})
.next(),
id: None,
},
))
}
@ -975,6 +995,7 @@ pub struct DefineTableStatement {
pub name: Ident,
pub drop: bool,
pub full: bool,
pub id: Option<u32>,
pub view: Option<View>,
pub permissions: Permissions,
pub changefeed: Option<ChangeFeed>,
@ -994,9 +1015,15 @@ impl DefineTableStatement {
let mut run = txn.lock().await;
// Process the statement
let key = crate::key::database::tb::new(opt.ns(), opt.db(), &self.name);
run.add_ns(opt.ns(), opt.strict).await?;
run.add_db(opt.ns(), opt.db(), opt.strict).await?;
run.set(key, self).await?;
let ns = run.add_ns(opt.ns(), opt.strict).await?;
let db = run.add_db(opt.ns(), opt.db(), opt.strict).await?;
if self.id.is_none() && ns.id.is_some() && db.id.is_some() {
let mut dt = self.clone();
dt.id = Some(run.get_next_tb_id(ns.id.unwrap(), db.id.unwrap()).await?);
run.set(key, dt).await?;
} else {
run.set(key, self).await?;
}
// Check if table is a view
if let Some(view) = &self.view {
// Remove the table data
@ -1092,6 +1119,7 @@ fn table(i: &str) -> IResult<&str, DefineTableStatement> {
_ => None,
})
.unwrap_or_default(),
id: None,
view: opts.iter().find_map(|x| match x {
DefineTableOption::View(ref v) => Some(v.to_owned()),
_ => None,
@ -1531,9 +1559,10 @@ mod tests {
fn check_define_serialize() {
let stm = DefineStatement::Namespace(DefineNamespaceStatement {
name: Ident::from("test"),
id: None,
});
let enc: Vec<u8> = stm.try_into().unwrap();
assert_eq!(9, enc.len());
assert_eq!(10, enc.len());
}
#[test]