TreeCache is currently inefficient on writes (#3954)

This commit is contained in:
Emmanuel Keller 2024-04-30 19:09:54 +01:00 committed by GitHub
parent a791f742b4
commit b9f02d146d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 2394 additions and 1270 deletions

17
Cargo.lock generated
View file

@ -4383,6 +4383,18 @@ dependencies = [
"parking_lot",
]
[[package]]
name = "quick_cache"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "347e1a588d1de074eeb3c00eadff93db4db65aeb62aee852b1efd0949fe65b6c"
dependencies = [
"ahash 0.8.11",
"equivalent",
"hashbrown 0.14.3",
"parking_lot",
]
[[package]]
name = "quote"
version = "1.0.35"
@ -5902,6 +5914,7 @@ dependencies = [
"chrono",
"ciborium",
"criterion",
"dashmap",
"deunicode",
"dmp",
"echodb",
@ -5929,7 +5942,7 @@ dependencies = [
"phf",
"pin-project-lite",
"pprof",
"quick_cache",
"quick_cache 0.5.1",
"radix_trie",
"rand 0.8.5",
"reblessive",
@ -6051,7 +6064,7 @@ dependencies = [
"hashbrown 0.14.3",
"lru",
"parking_lot",
"quick_cache",
"quick_cache 0.4.2",
"sha2",
"tokio",
"vart",

View file

@ -71,6 +71,7 @@ ciborium = "0.2.1"
cedar-policy = "2.4.2"
channel = { version = "1.9.0", package = "async-channel" }
chrono = { version = "0.4.31", features = ["serde"] }
dashmap = "5.5.3"
derive = { version = "0.12.0", package = "surrealdb-derive" }
deunicode = "1.4.1"
dmp = "0.2.0"
@ -111,7 +112,7 @@ once_cell = "1.18.0"
pbkdf2 = { version = "0.12.2", features = ["simple"] }
phf = { version = "0.11.2", features = ["macros", "unicase"] }
pin-project-lite = "0.2.13"
quick_cache = "0.4.0"
quick_cache = "0.5.1"
radix_trie = { version = "0.2.1", features = ["serde"] }
rand = "0.8.5"
reblessive = { version = "0.3.3", features = ["tree"] }

View file

@ -399,11 +399,11 @@ impl<'a> IndexOperation<'a> {
.await?;
// Delete the old index data
if let Some(o) = self.o.take() {
mt.remove_document(stk, &mut tx, self.rid, o).await?;
mt.remove_document(stk, &mut tx, self.rid, &o).await?;
}
// Create the new index data
if let Some(n) = self.n.take() {
mt.index_document(stk, &mut tx, self.rid, n).await?;
mt.index_document(stk, &mut tx, self.rid, &n).await?;
}
mt.finish(&mut tx).await
}

View file

@ -1,5 +1,6 @@
use crate::fnc::util::math::mean::Mean;
use crate::fnc::util::math::variance::variance;
use crate::fnc::util::math::ToFloat;
use crate::sql::number::Number;
pub trait Deviation {
@ -13,6 +14,11 @@ impl Deviation for Vec<Number> {
}
}
pub(super) fn deviation(v: &[Number], mean: f64, sample: bool) -> f64 {
// This function is exposed to optimise the pearson distance calculation.
// As the mean of the vector is already calculated, we pass it as a parameter rather than recalculating it.
pub(crate) fn deviation<T>(v: &[T], mean: f64, sample: bool) -> f64
where
T: ToFloat,
{
variance(v, mean, sample).sqrt()
}

View file

@ -1,10 +1,20 @@
use crate::sql::number::Number;
use crate::fnc::util::math::ToFloat;
use crate::sql::Number;
pub trait Mean {
fn mean(&self) -> f64;
}
impl Mean for Vec<Number> {
fn mean(&self) -> f64 {
self.as_slice().mean()
}
}
impl<T> Mean for &[T]
where
T: ToFloat,
{
fn mean(&self) -> f64 {
let len = self.len() as f64;
let sum = self.iter().map(|n| n.to_float()).sum::<f64>();

View file

@ -1,4 +1,5 @@
use super::mean::Mean;
use crate::fnc::util::math::ToFloat;
use crate::sql::number::Number;
pub trait Variance {
@ -13,7 +14,12 @@ impl Variance for Vec<Number> {
}
}
pub(super) fn variance(v: &[Number], mean: f64, sample: bool) -> f64 {
// This function is exposed to optimise the pearson distance calculation.
// As the mean of the vector is already calculated, we pass it as a parameter rather than recalculating it.
pub(super) fn variance<T>(v: &[T], mean: f64, sample: bool) -> f64
where
T: ToFloat,
{
match v.len() {
0 => f64::NAN,
1 => 0.0,

View file

@ -4,12 +4,7 @@ use crate::fnc::util::math::mean::Mean;
use crate::sql::Number;
use std::collections::HashSet;
pub trait Add {
/// Addition of two vectors
fn add(&self, other: &Self) -> Result<Vec<Number>, Error>;
}
fn check_same_dimension(fnc: &str, a: &[Number], b: &[Number]) -> Result<(), Error> {
pub(crate) fn check_same_dimension<T>(fnc: &str, a: &[T], b: &[T]) -> Result<(), Error> {
if a.len() != b.len() {
Err(Error::InvalidArguments {
name: String::from(fnc),
@ -20,6 +15,11 @@ fn check_same_dimension(fnc: &str, a: &[Number], b: &[Number]) -> Result<(), Err
}
}
pub trait Add {
/// Addition of two vectors
fn add(&self, other: &Self) -> Result<Vec<Number>, Error>;
}
impl Add for Vec<Number> {
fn add(&self, other: &Self) -> Result<Vec<Number>, Error> {
check_same_dimension("vector::add", self, other)?;
@ -42,6 +42,18 @@ impl Angle for Vec<Number> {
}
}
pub trait CosineDistance {
fn cosine_distance(&self, other: &Self) -> Result<Number, Error>;
}
impl CosineDistance for Vec<Number> {
fn cosine_distance(&self, other: &Self) -> Result<Number, Error> {
check_same_dimension("vector::distance::cosine", self, other)?;
let d = dot(self, other);
Ok(Number::from(1) - d / (self.magnitude() * other.magnitude()))
}
}
pub trait CosineSimilarity {
fn cosine_similarity(&self, other: &Self) -> Result<Number, Error>;
}
@ -81,7 +93,7 @@ pub trait HammingDistance {
impl HammingDistance for Vec<Number> {
fn hamming_distance(&self, other: &Self) -> Result<Number, Error> {
check_same_dimension("vector::distance::hamming", self, other)?;
Ok(self.iter().zip(other.iter()).filter(|&(a, b)| a != b).count().into())
Ok(self.iter().zip(other.iter()).filter(|(a, b)| a != b).count().into())
}
}
@ -91,11 +103,9 @@ pub trait JaccardSimilarity {
impl JaccardSimilarity for Vec<Number> {
fn jaccard_similarity(&self, other: &Self) -> Result<Number, Error> {
let set_a: HashSet<_> = HashSet::from_iter(self.iter());
let set_b: HashSet<_> = HashSet::from_iter(other.iter());
let intersection_size = set_a.intersection(&set_b).count() as f64;
let union_size = set_a.union(&set_b).count() as f64;
Ok((intersection_size / union_size).into())
let mut union: HashSet<&Number> = HashSet::from_iter(self.iter());
let intersection_size = other.iter().filter(|n| !union.insert(n)).count() as f64;
Ok((intersection_size / union.len() as f64).into())
}
}

View file

@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
pub type DocId = u64;
pub(crate) struct DocIds {
ixs: IndexStores,
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<TrieKeys>,
@ -43,6 +44,7 @@ impl DocIds {
)
.await;
Ok(Self {
ixs: ixs.clone(),
state_key,
index_key_base: ikb,
btree: BTree::new(state.btree),
@ -136,7 +138,7 @@ impl DocIds {
}
pub(in crate::idx) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
if self.store.finish(tx).await? {
if let Some(new_cache) = self.store.finish(tx).await? {
let btree = self.btree.inc_generation().clone();
let state = State {
btree,
@ -144,6 +146,7 @@ impl DocIds {
next_doc_id: self.next_doc_id,
};
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())
}

View file

@ -9,6 +9,7 @@ use crate::kvs::{Key, Transaction, TransactionType};
pub(super) type DocLength = u64;
pub(super) struct DocLengths {
ixs: IndexStores,
state_key: Key,
btree: BTree<TrieKeys>,
store: BTreeStore<TrieKeys>,
@ -38,6 +39,7 @@ impl DocLengths {
)
.await;
Ok(Self {
ixs: ixs.clone(),
state_key,
btree: BTree::new(state),
store,
@ -83,9 +85,10 @@ impl DocLengths {
}
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
if self.store.finish(tx).await? {
if let Some(new_cache) = self.store.finish(tx).await? {
let state = self.btree.inc_generation();
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())
}

View file

@ -10,6 +10,7 @@ use crate::kvs::{Key, Transaction, TransactionType};
pub(super) type TermFrequency = u64;
pub(super) struct Postings {
ixs: IndexStores,
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<TrieKeys>,
@ -40,6 +41,7 @@ impl Postings {
)
.await;
Ok(Self {
ixs: ixs.clone(),
state_key,
index_key_base,
btree: BTree::new(state),
@ -83,9 +85,10 @@ impl Postings {
}
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
if self.store.finish(tx).await? {
if let Some(new_cache) = self.store.finish(tx).await? {
let state = self.btree.inc_generation();
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
self.ixs.advance_cache_btree_trie(new_cache);
}
Ok(())
}

View file

@ -12,6 +12,7 @@ pub(crate) type TermId = u64;
pub(crate) type TermLen = u32;
pub(in crate::idx) struct Terms {
ixs: IndexStores,
state_key: Key,
index_key_base: IndexKeyBase,
btree: BTree<FstKeys>,
@ -44,6 +45,7 @@ impl Terms {
)
.await;
Ok(Self {
ixs: ixs.clone(),
state_key,
index_key_base,
btree: BTree::new(state.btree),
@ -120,7 +122,7 @@ impl Terms {
}
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<(), Error> {
if self.store.finish(tx).await? {
if let Some(new_cache) = self.store.finish(tx).await? {
let btree = self.btree.inc_generation().clone();
let state = State {
btree,
@ -128,6 +130,7 @@ impl Terms {
next_term_id: self.next_term_id,
};
tx.set(self.state_key.clone(), state.try_to_val()?).await?;
self.ixs.advance_store_btree_fst(new_cache);
}
Ok(())
}

View file

@ -137,7 +137,7 @@ impl InnerQueryExecutor {
if let IndexOperator::Knn(a, k) = io.op() {
let mut tx = txn.lock().await;
let entry = if let Some(mt) = mt_map.get(&ix_ref) {
MtEntry::new(&mut tx, mt, a.clone(), *k).await?
MtEntry::new(&mut tx, mt, a, *k).await?
} else {
let ikb = IndexKeyBase::new(opt, idx_def);
let mt = MTreeIndex::new(
@ -148,7 +148,7 @@ impl InnerQueryExecutor {
TransactionType::Read,
)
.await?;
let entry = MtEntry::new(&mut tx, &mt, a.clone(), *k).await?;
let entry = MtEntry::new(&mut tx, &mt, a, *k).await?;
mt_map.insert(ix_ref, mt);
entry
};
@ -397,8 +397,11 @@ impl QueryExecutor {
fn new_mtree_index_knn_iterator(&self, it_ref: IteratorRef) -> Option<ThingIterator> {
if let Some(IteratorEntry::Single(exp, ..)) = self.0.it_entries.get(it_ref as usize) {
if let Some(mte) = self.0.mt_entries.get(exp.as_ref()) {
let it = DocIdsIterator::new(mte.doc_ids.clone(), mte.res.clone());
if let Some(mte) = self.0.mt_entries.get(exp) {
let it = DocIdsIterator::new(
mte.doc_ids.clone(),
mte.res.iter().map(|(d, _)| *d).collect(),
);
return Some(ThingIterator::Knn(it));
}
}
@ -640,14 +643,14 @@ impl FtEntry {
#[derive(Clone)]
pub(super) struct MtEntry {
doc_ids: Arc<RwLock<DocIds>>,
res: VecDeque<DocId>,
res: VecDeque<(DocId, f64)>,
}
impl MtEntry {
async fn new(
tx: &mut kvs::Transaction,
mt: &MTreeIndex,
a: Array,
a: &Array,
k: u32,
) -> Result<Self, Error> {
let res = mt.knn_search(tx, a, k as usize).await?;

View file

@ -1,6 +1,6 @@
use crate::err::Error;
use crate::idx::trees::bkeys::BKeys;
use crate::idx::trees::store::{NodeId, StoredNode, TreeNode, TreeStore};
use crate::idx::trees::store::{NodeId, StoreGeneration, StoredNode, TreeNode, TreeStore};
use crate::idx::VersionedSerdeState;
use crate::kvs::{Key, Transaction, Val};
use crate::sql::{Object, Value};
@ -36,7 +36,7 @@ pub struct BState {
root: Option<NodeId>,
next_node_id: NodeId,
#[revision(start = 2)]
generation: u64,
generation: StoreGeneration,
}
impl VersionedSerdeState for BState {
@ -158,6 +158,10 @@ impl<BK> TreeNode for BTreeNode<BK>
where
BK: BKeys + Clone,
{
fn prepare_save(&mut self) {
self.keys_mut().compile();
}
fn try_from_val(val: Val) -> Result<Self, Error> {
let mut c: Cursor<Vec<u8>> = Cursor::new(val);
let node_type: u8 = bincode::deserialize_from(&mut c)?;
@ -172,8 +176,7 @@ where
}
}
fn try_into_val(&mut self) -> Result<Val, Error> {
self.keys_mut().compile();
fn try_into_val(&self) -> Result<Val, Error> {
let mut c: Cursor<Vec<u8>> = Cursor::new(Vec::new());
match self {
BTreeNode::Internal(keys, child) => {
@ -1017,7 +1020,7 @@ mod tests {
#[test]
fn test_node_serde_internal() {
let mut node = BTreeNode::Internal(FstKeys::default(), vec![]);
node.keys_mut().compile();
node.prepare_save();
let val = node.try_into_val().unwrap();
let _: BTreeNode<FstKeys> = BTreeNode::try_from_val(val).unwrap();
}
@ -1025,6 +1028,7 @@ mod tests {
#[test]
fn test_node_serde_leaf() {
let mut node = BTreeNode::Leaf(TrieKeys::default());
node.prepare_save();
let val = node.try_into_val().unwrap();
let _: BTreeNode<TrieKeys> = BTreeNode::try_from_val(val).unwrap();
}
@ -1507,7 +1511,7 @@ mod tests {
where
BK: BKeys + Clone + Debug,
{
if st.finish(&mut tx).await? {
if st.finish(&mut tx).await?.is_some() {
t.state.generation += 1;
}
gen += 1;

738
core/src/idx/trees/knn.rs Normal file
View file

@ -0,0 +1,738 @@
use crate::idx::docids::DocId;
use crate::idx::trees::store::NodeId;
use roaring::RoaringTreemap;
use std::cmp::{Ordering, Reverse};
use std::collections::btree_map::Entry;
#[cfg(debug_assertions)]
use std::collections::HashMap;
use std::collections::{BTreeMap, VecDeque};
#[derive(Debug, Clone, Copy, Ord, Eq, PartialEq, PartialOrd)]
pub(super) struct PriorityNode(Reverse<FloatKey>, NodeId);
impl PriorityNode {
pub(super) fn new(d: f64, id: NodeId) -> Self {
Self(Reverse(FloatKey::new(d)), id)
}
pub(super) fn id(&self) -> NodeId {
self.1
}
}
/// Treats f64 as a sortable data type.
/// It provides an implementation so it can be used as a key in a BTreeMap or BTreeSet.
#[derive(Debug, Clone, Copy)]
pub(super) struct FloatKey(f64);
impl FloatKey {
pub(super) fn new(f: f64) -> Self {
FloatKey(f)
}
}
impl From<FloatKey> for f64 {
fn from(v: FloatKey) -> Self {
v.0
}
}
impl Eq for FloatKey {}
impl PartialEq<Self> for FloatKey {
fn eq(&self, other: &Self) -> bool {
self.0.total_cmp(&other.0) == Ordering::Equal
}
}
impl PartialOrd<Self> for FloatKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FloatKey {
fn cmp(&self, other: &Self) -> Ordering {
self.0.total_cmp(&other.0)
}
}
/// Ids64 is a collection able to store u64 identifiers in an optimised way.
/// The enumerations are optimised in a way that, depending on the number of identifiers,
/// the most memory efficient variant is used.
/// When identifiers are added or removed, the method returned the most appropriate
/// variant (if required).
#[derive(Debug, Clone, PartialEq)]
pub(super) enum Ids64 {
#[allow(dead_code)] // Will be used with HNSW
Empty,
One(u64),
Vec2([u64; 2]),
Vec3([u64; 3]),
Vec4([u64; 4]),
Vec5([u64; 5]),
Vec6([u64; 6]),
Vec7([u64; 7]),
Vec8([u64; 8]),
Bits(RoaringTreemap),
}
impl Ids64 {
fn len(&self) -> u64 {
match self {
Self::Empty => 0,
Self::One(_) => 1,
Self::Vec2(_) => 2,
Self::Vec3(_) => 3,
Self::Vec4(_) => 4,
Self::Vec5(_) => 5,
Self::Vec6(_) => 6,
Self::Vec7(_) => 7,
Self::Vec8(_) => 8,
Self::Bits(b) => b.len(),
}
}
fn append_to(&self, to: &mut RoaringTreemap) {
match &self {
Self::Empty => {}
Self::One(d) => {
to.insert(*d);
}
Self::Vec2(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec3(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec4(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec5(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec6(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec7(a) => {
for d in a {
to.insert(*d);
}
}
Self::Vec8(a) => {
for d in a {
to.insert(*d);
}
}
Self::Bits(b) => {
for d in b {
to.insert(d);
}
}
}
}
fn remove_to(&self, to: &mut RoaringTreemap) {
match &self {
Self::Empty => {}
Self::One(d) => {
to.remove(*d);
}
Self::Vec2(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec3(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec4(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec5(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec6(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec7(a) => {
for &d in a {
to.remove(d);
}
}
Self::Vec8(a) => {
for &d in a {
to.remove(d);
}
}
Self::Bits(b) => {
for d in b {
to.remove(d);
}
}
}
}
fn append_iter_ref<'a, I>(&mut self, docs: I) -> Option<Self>
where
I: Iterator<Item = &'a DocId>,
{
let mut new_doc: Option<Self> = None;
for &doc in docs {
if let Some(ref mut nd) = new_doc {
let nd = nd.insert(doc);
if nd.is_some() {
new_doc = nd;
};
} else {
new_doc = self.insert(doc);
}
}
new_doc
}
fn append_iter<I>(&mut self, docs: I) -> Option<Self>
where
I: Iterator<Item = DocId>,
{
let mut new_doc: Option<Self> = None;
for doc in docs {
if let Some(mut nd) = new_doc {
new_doc = nd.insert(doc);
} else {
new_doc = self.insert(doc);
}
}
new_doc
}
fn append_from(&mut self, from: &Ids64) -> Option<Self> {
match from {
Self::Empty => None,
Self::One(d) => self.insert(*d),
Self::Vec2(a) => self.append_iter_ref(a.iter()),
Self::Vec3(a) => self.append_iter_ref(a.iter()),
Self::Vec4(a) => self.append_iter_ref(a.iter()),
Self::Vec5(a) => self.append_iter_ref(a.iter()),
Self::Vec6(a) => self.append_iter_ref(a.iter()),
Self::Vec7(a) => self.append_iter_ref(a.iter()),
Self::Vec8(a) => self.append_iter_ref(a.iter()),
Self::Bits(a) => self.append_iter(a.iter()),
}
}
fn iter(&self) -> Box<dyn Iterator<Item = DocId> + '_> {
match &self {
Self::Empty => Box::new(EmptyIterator {}),
Self::One(d) => Box::new(OneDocIterator(Some(*d))),
Self::Vec2(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec3(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec4(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec5(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec6(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec7(a) => Box::new(SliceDocIterator(a.iter())),
Self::Vec8(a) => Box::new(SliceDocIterator(a.iter())),
Self::Bits(a) => Box::new(a.iter()),
}
}
fn contains(&self, d: DocId) -> bool {
match self {
Self::Empty => false,
Self::One(o) => *o == d,
Self::Vec2(a) => a.contains(&d),
Self::Vec3(a) => a.contains(&d),
Self::Vec4(a) => a.contains(&d),
Self::Vec5(a) => a.contains(&d),
Self::Vec6(a) => a.contains(&d),
Self::Vec7(a) => a.contains(&d),
Self::Vec8(a) => a.contains(&d),
Self::Bits(b) => b.contains(d),
}
}
pub(super) fn insert(&mut self, d: DocId) -> Option<Self> {
if !self.contains(d) {
match self {
Self::Empty => Some(Self::One(d)),
Self::One(o) => Some(Self::Vec2([*o, d])),
Self::Vec2(a) => Some(Self::Vec3([a[0], a[1], d])),
Self::Vec3(a) => Some(Self::Vec4([a[0], a[1], a[2], d])),
Self::Vec4(a) => Some(Self::Vec5([a[0], a[1], a[2], a[3], d])),
Self::Vec5(a) => Some(Self::Vec6([a[0], a[1], a[2], a[3], a[4], d])),
Self::Vec6(a) => Some(Self::Vec7([a[0], a[1], a[2], a[3], a[4], a[5], d])),
Self::Vec7(a) => Some(Self::Vec8([a[0], a[1], a[2], a[3], a[4], a[5], a[6], d])),
Self::Vec8(a) => Some(Self::Bits(RoaringTreemap::from([
a[0], a[1], a[2], a[3], a[4], a[5], a[6], a[7], d,
]))),
Self::Bits(b) => {
b.insert(d);
None
}
}
} else {
None
}
}
#[allow(dead_code)] // Will be used with HNSW
pub(super) fn remove(&mut self, d: DocId) -> Option<Self> {
match self {
Self::Empty => None,
Self::One(i) => {
if d == *i {
Some(Self::Empty)
} else {
None
}
}
Self::Vec2(a) => a.iter().find(|&&i| i != d).map(|&i| Self::One(i)),
Self::Vec3(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 2 {
Some(Self::Vec2([v[0], v[1]]))
} else {
None
}
}
Self::Vec4(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 3 {
Some(Self::Vec3([v[0], v[1], v[2]]))
} else {
None
}
}
Self::Vec5(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 4 {
Some(Self::Vec4([v[0], v[1], v[2], v[3]]))
} else {
None
}
}
Self::Vec6(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 5 {
Some(Self::Vec5([v[0], v[1], v[2], v[3], v[4]]))
} else {
None
}
}
Self::Vec7(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 6 {
Some(Self::Vec6([v[0], v[1], v[2], v[3], v[4], v[5]]))
} else {
None
}
}
Self::Vec8(a) => {
let v: Vec<DocId> = a.iter().filter(|&&i| i != d).cloned().collect();
if v.len() == 7 {
Some(Self::Vec7([v[0], v[1], v[2], v[3], v[4], v[5], v[6]]))
} else {
None
}
}
Self::Bits(b) => {
if !b.remove(d) || b.len() != 8 {
None
} else {
let v: Vec<DocId> = b.iter().collect();
Some(Self::Vec8([v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]]))
}
}
}
}
}
struct EmptyIterator;
impl Iterator for EmptyIterator {
type Item = DocId;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
struct OneDocIterator(Option<DocId>);
impl Iterator for OneDocIterator {
type Item = DocId;
fn next(&mut self) -> Option<Self::Item> {
self.0.take()
}
}
struct SliceDocIterator<'a, I>(I)
where
I: Iterator<Item = &'a DocId>;
impl<'a, I> Iterator for SliceDocIterator<'a, I>
where
I: Iterator<Item = &'a DocId>,
{
type Item = DocId;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().cloned()
}
}
#[non_exhaustive]
pub(super) struct KnnResultBuilder {
knn: u64,
docs: RoaringTreemap,
priority_list: BTreeMap<FloatKey, Ids64>,
}
impl KnnResultBuilder {
pub(super) fn new(knn: usize) -> Self {
Self {
knn: knn as u64,
docs: RoaringTreemap::default(),
priority_list: BTreeMap::default(),
}
}
pub(super) fn check_add(&self, dist: f64) -> bool {
if self.docs.len() < self.knn {
true
} else if let Some(pr) = self.priority_list.keys().last() {
dist <= pr.0
} else {
true
}
}
pub(super) fn add(&mut self, dist: f64, docs: &Ids64) {
let pr = FloatKey(dist);
docs.append_to(&mut self.docs);
match self.priority_list.entry(pr) {
Entry::Vacant(e) => {
e.insert(docs.clone());
}
Entry::Occupied(mut e) => {
let d = e.get_mut();
if let Some(n) = d.append_from(docs) {
e.insert(n);
}
}
}
// Do possible eviction
let docs_len = self.docs.len();
if docs_len > self.knn {
if let Some((_, d)) = self.priority_list.last_key_value() {
if docs_len - d.len() >= self.knn {
if let Some((_, evicted_docs)) = self.priority_list.pop_last() {
evicted_docs.remove_to(&mut self.docs);
}
}
}
}
}
pub(super) fn build(
self,
#[cfg(debug_assertions)] visited_nodes: HashMap<NodeId, usize>,
) -> KnnResult {
let mut sorted_docs = VecDeque::with_capacity(self.knn as usize);
let mut left = self.knn;
for (pr, docs) in self.priority_list {
let dl = docs.len();
if dl > left {
for doc_id in docs.iter().take(left as usize) {
sorted_docs.push_back((doc_id, pr.0));
}
break;
}
for doc_id in docs.iter() {
sorted_docs.push_back((doc_id, pr.0));
}
left -= dl;
// We don't expect anymore result, we can leave
if left == 0 {
break;
}
}
trace!("sorted_docs: {:?}", sorted_docs);
KnnResult {
docs: sorted_docs,
#[cfg(debug_assertions)]
visited_nodes,
}
}
}
pub struct KnnResult {
pub(in crate::idx::trees) docs: VecDeque<(DocId, f64)>,
#[cfg(debug_assertions)]
#[allow(dead_code)]
pub(in crate::idx::trees) visited_nodes: HashMap<NodeId, usize>,
}
#[cfg(test)]
pub(super) mod tests {
use crate::idx::docids::DocId;
use crate::idx::trees::knn::{FloatKey, Ids64, KnnResultBuilder};
use crate::idx::trees::vector::{SharedVector, Vector};
use crate::sql::index::{Distance, VectorType};
use crate::sql::Number;
use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};
use roaring::RoaringTreemap;
use rust_decimal::prelude::Zero;
use std::cmp::Reverse;
#[cfg(debug_assertions)]
use std::collections::HashMap;
use std::collections::{BTreeSet, BinaryHeap, VecDeque};
use test_log::test;
pub(crate) fn get_seed_rnd() -> SmallRng {
let seed: u64 = std::env::var("TEST_SEED")
.unwrap_or_else(|_| rand::random::<u64>().to_string())
.parse()
.expect("Failed to parse seed");
info!("Seed: {}", seed);
// Create a seeded RNG
SmallRng::seed_from_u64(seed)
}
#[derive(Debug)]
pub(in crate::idx::trees) enum TestCollection {
Unique(Vec<(DocId, SharedVector)>),
NonUnique(Vec<(DocId, SharedVector)>),
}
impl TestCollection {
pub(in crate::idx::trees) fn to_vec_ref(&self) -> &Vec<(DocId, SharedVector)> {
match self {
TestCollection::Unique(c) | TestCollection::NonUnique(c) => c,
}
}
pub(in crate::idx::trees) fn len(&self) -> usize {
self.to_vec_ref().len()
}
}
pub(in crate::idx::trees) fn new_random_vec(
rng: &mut SmallRng,
t: VectorType,
dim: usize,
gen: &RandomItemGenerator,
) -> SharedVector {
let mut vec = Vector::new(t, dim);
for _ in 0..dim {
vec.add(&gen.generate(rng));
}
if vec.is_null() {
// Some similarities (cosine) is undefined for null vector.
new_random_vec(rng, t, dim, gen)
} else {
vec.into()
}
}
impl Vector {
pub(super) fn is_null(&self) -> bool {
match self {
Self::F64(a) => !a.iter().any(|a| !a.is_zero()),
Self::F32(a) => !a.iter().any(|a| !a.is_zero()),
Self::I64(a) => !a.iter().any(|a| !a.is_zero()),
Self::I32(a) => !a.iter().any(|a| !a.is_zero()),
Self::I16(a) => !a.iter().any(|a| !a.is_zero()),
}
}
}
impl TestCollection {
pub(in crate::idx::trees) fn new(
unique: bool,
collection_size: usize,
vt: VectorType,
dimension: usize,
distance: &Distance,
) -> Self {
let mut rng = get_seed_rnd();
let gen = RandomItemGenerator::new(&distance, dimension);
if unique {
TestCollection::new_unique(collection_size, vt, dimension, &gen, &mut rng)
} else {
TestCollection::new_random(collection_size, vt, dimension, &gen, &mut rng)
}
}
fn add(&mut self, doc: DocId, pt: SharedVector) {
match self {
TestCollection::Unique(vec) => vec,
TestCollection::NonUnique(vec) => vec,
}
.push((doc, pt));
}
fn new_unique(
collection_size: usize,
vector_type: VectorType,
dimension: usize,
gen: &RandomItemGenerator,
rng: &mut SmallRng,
) -> Self {
let mut vector_set = BTreeSet::new();
let mut attempts = collection_size * 2;
while vector_set.len() < collection_size {
vector_set.insert(new_random_vec(rng, vector_type, dimension, gen));
attempts -= 1;
if attempts == 0 {
panic!("Fail generating a unique random collection {vector_type} {dimension}");
}
}
let mut coll = TestCollection::Unique(Vec::with_capacity(vector_set.len()));
for (i, v) in vector_set.into_iter().enumerate() {
coll.add(i as DocId, v.into());
}
coll
}
fn new_random(
collection_size: usize,
vector_type: VectorType,
dimension: usize,
gen: &RandomItemGenerator,
rng: &mut SmallRng,
) -> Self {
let mut coll = TestCollection::NonUnique(Vec::with_capacity(collection_size));
// Prepare data set
for doc_id in 0..collection_size {
coll.add(doc_id as DocId, new_random_vec(rng, vector_type, dimension, gen).into());
}
coll
}
pub(in crate::idx::trees) fn is_unique(&self) -> bool {
matches!(self, TestCollection::Unique(_))
}
}
pub(in crate::idx::trees) enum RandomItemGenerator {
Int(i64, i64),
Float(f64, f64),
}
impl RandomItemGenerator {
pub(in crate::idx::trees) fn new(dist: &Distance, dim: usize) -> Self {
match dist {
Distance::Jaccard => Self::Int(0, (dim / 2) as i64),
Distance::Hamming => Self::Int(0, 2),
_ => Self::Float(-20.0, 20.0),
}
}
fn generate(&self, rng: &mut SmallRng) -> Number {
match self {
RandomItemGenerator::Int(from, to) => Number::Int(rng.gen_range(*from..*to)),
RandomItemGenerator::Float(from, to) => Number::Float(rng.gen_range(*from..=*to)),
}
}
}
#[test]
fn knn_result_builder_test() {
let mut b = KnnResultBuilder::new(7);
b.add(0.0, &Ids64::One(5));
b.add(0.2, &Ids64::Vec3([0, 1, 2]));
b.add(0.2, &Ids64::One(3));
b.add(0.2, &Ids64::Vec2([6, 8]));
let res = b.build(
#[cfg(debug_assertions)]
HashMap::new(),
);
assert_eq!(
res.docs,
VecDeque::from([(5, 0.0), (0, 0.2), (1, 0.2), (2, 0.2), (3, 0.2), (6, 0.2), (8, 0.2)])
);
}
#[test]
fn test_ids() {
let mut ids = Ids64::Empty;
let mut ids = ids.insert(10).expect("Ids64::One");
assert_eq!(ids, Ids64::One(10));
let mut ids = ids.insert(20).expect("Ids64::Vec2");
assert_eq!(ids, Ids64::Vec2([10, 20]));
let mut ids = ids.insert(30).expect("Ids64::Vec3");
assert_eq!(ids, Ids64::Vec3([10, 20, 30]));
let mut ids = ids.insert(40).expect("Ids64::Vec4");
assert_eq!(ids, Ids64::Vec4([10, 20, 30, 40]));
let mut ids = ids.insert(50).expect("Ids64::Vec5");
assert_eq!(ids, Ids64::Vec5([10, 20, 30, 40, 50]));
let mut ids = ids.insert(60).expect("Ids64::Vec6");
assert_eq!(ids, Ids64::Vec6([10, 20, 30, 40, 50, 60]));
let mut ids = ids.insert(70).expect("Ids64::Vec7");
assert_eq!(ids, Ids64::Vec7([10, 20, 30, 40, 50, 60, 70]));
let mut ids = ids.insert(80).expect("Ids64::Vec8");
assert_eq!(ids, Ids64::Vec8([10, 20, 30, 40, 50, 60, 70, 80]));
let mut ids = ids.insert(90).expect("Ids64::Bits");
assert_eq!(ids, Ids64::Bits(RoaringTreemap::from([10, 20, 30, 40, 50, 60, 70, 80, 90])));
assert_eq!(ids.insert(100), None);
assert_eq!(
ids,
Ids64::Bits(RoaringTreemap::from([10, 20, 30, 40, 50, 60, 70, 80, 90, 100]))
);
assert_eq!(ids.remove(10), None);
assert_eq!(ids, Ids64::Bits(RoaringTreemap::from([20, 30, 40, 50, 60, 70, 80, 90, 100])));
let mut ids = ids.remove(20).expect("Ids64::Vec8");
assert_eq!(ids, Ids64::Vec8([30, 40, 50, 60, 70, 80, 90, 100]));
let mut ids = ids.remove(30).expect("Ids64::Vec7");
assert_eq!(ids, Ids64::Vec7([40, 50, 60, 70, 80, 90, 100]));
let mut ids = ids.remove(40).expect("Ids64::Vec6");
assert_eq!(ids, Ids64::Vec6([50, 60, 70, 80, 90, 100]));
let mut ids = ids.remove(50).expect("Ids64::Vec5");
assert_eq!(ids, Ids64::Vec5([60, 70, 80, 90, 100]));
let mut ids = ids.remove(60).expect("Ids64::Vec4");
assert_eq!(ids, Ids64::Vec4([70, 80, 90, 100]));
let mut ids = ids.remove(70).expect("Ids64::Vec3");
assert_eq!(ids, Ids64::Vec3([80, 90, 100]));
let mut ids = ids.remove(80).expect("Ids64::Vec2");
assert_eq!(ids, Ids64::Vec2([90, 100]));
let mut ids = ids.remove(90).expect("Ids64::One");
assert_eq!(ids, Ids64::One(100));
let ids = ids.remove(100).expect("Ids64::Empty");
assert_eq!(ids, Ids64::Empty);
}
#[test]
fn test_priority_node() {
let (n1, n2, n3) =
((FloatKey::new(1.0), 1), (FloatKey::new(2.0), 2), (FloatKey::new(3.0), 3));
let mut q = BinaryHeap::from([n3, n1, n2]);
assert_eq!(q.pop(), Some(n3));
assert_eq!(q.pop(), Some(n2));
assert_eq!(q.pop(), Some(n1));
let (n1, n2, n3) = (Reverse(n1), Reverse(n2), Reverse(n3));
let mut q = BinaryHeap::from([n3, n1, n2]);
assert_eq!(q.pop(), Some(n1));
assert_eq!(q.pop(), Some(n2));
assert_eq!(q.pop(), Some(n3));
}
}

View file

@ -1,5 +1,6 @@
pub mod bkeys;
pub mod btree;
pub mod knn;
pub mod mtree;
pub mod store;
pub mod vector;

File diff suppressed because it is too large Load diff

View file

@ -1,18 +1,15 @@
use crate::err::Error;
use crate::idx::trees::store::{NodeId, StoredNode, TreeNode, TreeNodeProvider};
use crate::idx::trees::store::lru::{CacheKey, ConcurrentLru};
use crate::idx::trees::store::{NodeId, StoreGeneration, StoredNode, TreeNode, TreeNodeProvider};
use crate::kvs::{Key, Transaction};
use quick_cache::sync::Cache;
use quick_cache::GuardResult;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::sync::Arc;
use tokio::sync::RwLock;
pub type CacheGen = u64;
pub(super) struct TreeCaches<N>(Arc<RwLock<HashMap<Key, TreeCache<N>>>>)
pub(super) struct TreeCaches<N>(Arc<DashMap<Key, Arc<TreeCache<N>>>>)
where
N: TreeNode + Debug + Clone + Display;
@ -22,15 +19,15 @@ where
{
pub(super) async fn get_cache(
&self,
generation: CacheGen,
generation: StoreGeneration,
keys: &TreeNodeProvider,
cache_size: usize,
) -> TreeCache<N> {
) -> Arc<TreeCache<N>> {
#[cfg(debug_assertions)]
debug!("get_cache {generation}");
// We take the key from the node 0 as the key identifier for the cache
let key = keys.get_key(0);
match self.0.write().await.entry(key) {
let cache_key = keys.get_key(0);
match self.0.entry(cache_key.clone()) {
Entry::Occupied(mut e) => {
let c = e.get_mut();
// The cache and the store are matching, we can send a clone of the cache.
@ -38,13 +35,18 @@ where
Ordering::Less => {
// The store generation is older than the current cache,
// we return an empty cache, but we don't hold it
TreeCache::new(generation, keys.clone(), cache_size)
Arc::new(TreeCache::new(generation, cache_key, keys.clone(), cache_size))
}
Ordering::Equal => c.clone(),
Ordering::Greater => {
// The store generation is more recent than the cache,
// we create a new one and hold it
let c = TreeCache::new(generation, keys.clone(), cache_size);
let c = Arc::new(TreeCache::new(
generation,
cache_key,
keys.clone(),
cache_size,
));
e.insert(c.clone());
c
}
@ -52,20 +54,35 @@ where
}
Entry::Vacant(e) => {
// There is no cache for index, we create one and hold it
let c = TreeCache::new(generation, keys.clone(), cache_size);
let c = Arc::new(TreeCache::new(generation, cache_key, keys.clone(), cache_size));
e.insert(c.clone());
c
}
}
}
pub(super) async fn remove_cache(&self, keys: &TreeNodeProvider) {
let key = keys.get_key(0);
self.0.write().await.remove(&key);
pub(super) fn new_cache(&self, new_cache: TreeCache<N>) {
match self.0.entry(new_cache.cache_key().clone()) {
Entry::Occupied(mut e) => {
let old_cache = e.get();
// We only store the cache if it is a newer generation
if new_cache.generation() > old_cache.generation() {
e.insert(Arc::new(new_cache));
}
}
Entry::Vacant(e) => {
e.insert(Arc::new(new_cache));
}
}
}
pub(crate) async fn is_empty(&self) -> bool {
self.0.read().await.is_empty()
pub(super) fn remove_caches(&self, keys: &TreeNodeProvider) {
let key = keys.get_key(0);
self.0.remove(&key);
}
pub(crate) fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
@ -74,29 +91,41 @@ where
N: TreeNode + Debug + Clone + Display,
{
fn default() -> Self {
Self(Arc::new(RwLock::new(HashMap::new())))
Self(Arc::new(DashMap::new()))
}
}
#[derive(Clone)]
#[non_exhaustive]
pub enum TreeCache<N>
where
N: TreeNode + Debug + Clone + Display,
{
Lru(CacheGen, TreeLruCache<N>),
Full(CacheGen, TreeFullCache<N>),
Lru(Key, StoreGeneration, TreeLruCache<N>),
Full(Key, StoreGeneration, TreeFullCache<N>),
}
impl<N> TreeCache<N>
where
N: TreeNode + Debug + Clone + Display,
{
pub fn new(generation: CacheGen, keys: TreeNodeProvider, cache_size: usize) -> Self {
pub fn new(
generation: StoreGeneration,
cache_key: Key,
keys: TreeNodeProvider,
cache_size: usize,
) -> Self {
if cache_size == 0 {
TreeCache::Full(generation, TreeFullCache::new(keys))
Self::Full(cache_key, generation, TreeFullCache::new(keys))
} else {
TreeCache::Lru(generation, TreeLruCache::new(keys, cache_size))
Self::Lru(cache_key, generation, TreeLruCache::new(keys, cache_size))
}
}
#[cfg(test)]
pub(in crate::idx) fn len(&self) -> usize {
match self {
Self::Lru(_, _, c) => c.lru.len(),
Self::Full(_, _, c) => c.cache.len(),
}
}
@ -106,14 +135,52 @@ where
node_id: NodeId,
) -> Result<Arc<StoredNode<N>>, Error> {
match self {
TreeCache::Lru(_, c) => c.get_node(tx, node_id).await,
TreeCache::Full(_, c) => c.get_node(tx, node_id).await,
Self::Lru(_, _, c) => c.get_node(tx, node_id).await,
Self::Full(_, _, c) => c.get_node(tx, node_id).await,
}
}
fn generation(&self) -> CacheGen {
pub(super) async fn set_node(&self, node: StoredNode<N>) {
match self {
TreeCache::Lru(gen, _) | TreeCache::Full(gen, _) => *gen,
Self::Lru(_, _, c) => c.set_node(node).await,
Self::Full(_, _, c) => c.set_node(node),
}
}
pub(super) async fn remove_node(&self, node_id: &NodeId) {
match self {
Self::Lru(_, _, c) => c.remove_node(node_id).await,
Self::Full(_, _, c) => c.remove_node(node_id),
}
}
pub(super) fn cache_key(&self) -> &Key {
match self {
Self::Lru(k, _, _) => k,
Self::Full(k, _, _) => k,
}
}
fn generation(&self) -> StoreGeneration {
match self {
Self::Lru(_, gen, _) | TreeCache::Full(_, gen, _) => *gen,
}
}
/// Creates a copy of the cache, with a generation number incremented by one.
/// The new cache does not contain the NodeID contained in `updated` and `removed`.
pub(super) async fn next_generation(
&self,
updated: &HashSet<NodeId>,
removed: &HashMap<NodeId, Key>,
) -> Self {
match self {
Self::Lru(k, g, c) => {
Self::Lru(k.clone(), *g + 1, c.next_generation(updated, removed).await)
}
Self::Full(k, g, c) => {
Self::Full(k.clone(), *g + 1, c.next_generation(updated, removed))
}
}
}
}
@ -124,17 +191,18 @@ where
N: TreeNode + Debug + Clone + Display,
{
keys: TreeNodeProvider,
cache: Arc<Cache<NodeId, Arc<StoredNode<N>>>>,
lru: ConcurrentLru<Arc<StoredNode<N>>>,
}
impl<N> TreeLruCache<N>
where
N: TreeNode + Debug + Clone,
{
fn new(keys: TreeNodeProvider, cache_size: usize) -> Self {
fn new(keys: TreeNodeProvider, size: usize) -> Self {
let lru = ConcurrentLru::new(size);
Self {
keys,
cache: Arc::new(Cache::new(cache_size)),
lru,
}
}
@ -143,26 +211,29 @@ where
tx: &mut Transaction,
node_id: NodeId,
) -> Result<Arc<StoredNode<N>>, Error> {
match self.cache.get_value_or_guard(&node_id, None) {
GuardResult::Value(v) => Ok(v),
GuardResult::Guard(g) => {
let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
g.insert(n.clone()).ok();
Ok(n)
}
GuardResult::Timeout => Err(Error::Unreachable("TreeCache::get_node")),
if let Some(n) = self.lru.get(node_id).await {
return Ok(n);
}
let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
self.lru.insert(node_id as CacheKey, n.clone()).await;
Ok(n)
}
}
impl<N> Clone for TreeLruCache<N>
where
N: TreeNode + Debug + Clone,
{
fn clone(&self) -> Self {
async fn set_node(&self, node: StoredNode<N>) {
self.lru.insert(node.id as CacheKey, node.into()).await;
}
async fn remove_node(&self, node_id: &NodeId) {
self.lru.remove(*node_id as CacheKey).await;
}
async fn next_generation(
&self,
updated: &HashSet<NodeId>,
removed: &HashMap<NodeId, Key>,
) -> Self {
Self {
keys: self.keys.clone(),
cache: self.cache.clone(),
lru: self.lru.duplicate(|id| !removed.contains_key(id) || !updated.contains(id)).await,
}
}
}
@ -173,7 +244,7 @@ where
N: TreeNode + Debug + Clone,
{
keys: TreeNodeProvider,
cache: Arc<RwLock<HashMap<NodeId, Arc<StoredNode<N>>>>>,
cache: DashMap<NodeId, Arc<StoredNode<N>>>,
}
impl<N> TreeFullCache<N>
@ -183,7 +254,7 @@ where
pub fn new(keys: TreeNodeProvider) -> Self {
Self {
keys,
cache: Arc::new(RwLock::new(HashMap::new())),
cache: DashMap::new(),
}
}
@ -192,11 +263,7 @@ where
tx: &mut Transaction,
node_id: NodeId,
) -> Result<Arc<StoredNode<N>>, Error> {
// Let's first try with the read lock
if let Some(n) = self.cache.read().await.get(&node_id).cloned() {
return Ok(n);
}
match self.cache.write().await.entry(node_id) {
match self.cache.entry(node_id) {
Entry::Occupied(e) => Ok(e.get().clone()),
Entry::Vacant(e) => {
let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
@ -205,16 +272,24 @@ where
}
}
}
}
impl<N> Clone for TreeFullCache<N>
where
N: TreeNode + Debug + Clone,
{
fn clone(&self) -> Self {
Self {
keys: self.keys.clone(),
cache: self.cache.clone(),
}
pub(super) fn set_node(&self, node: StoredNode<N>) {
self.cache.insert(node.id, node.into());
}
pub(super) fn remove_node(&self, node_id: &NodeId) {
self.cache.remove(node_id);
}
fn next_generation(&self, updated: &HashSet<NodeId>, removed: &HashMap<NodeId, Key>) -> Self {
let new_cache = Self::new(self.keys.clone());
self.cache
.iter()
.filter(|r| !removed.contains_key(r.key()))
.filter(|r| !updated.contains(r.key()))
.for_each(|r| {
new_cache.cache.insert(r.id, r.value().clone());
});
new_cache
}
}

View file

@ -0,0 +1,312 @@
use futures::future::join_all;
use std::collections::HashMap;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use tokio::sync::Mutex;
pub(super) type CacheKey = u64;
pub(super) struct ConcurrentLru<V>
where
V: Clone,
{
/// Each shard is a LRU cache
shards: Vec<Mutex<LruShard<V>>>,
/// Keep track of sizes for each shard
lengths: Vec<AtomicUsize>,
/// If lengths == capacity, then full is true
full: AtomicBool,
/// The number of shards
shards_count: usize,
/// The maximum capacity
capacity: usize,
}
impl<V> ConcurrentLru<V>
where
V: Clone,
{
pub(super) fn new(capacity: usize) -> Self {
let shards_count = num_cpus::get().min(capacity);
let mut shards = Vec::with_capacity(shards_count);
let mut lengths = Vec::with_capacity(shards_count);
for _ in 0..shards_count {
shards.push(Mutex::new(LruShard::new()));
lengths.push(AtomicUsize::new(0));
}
Self {
shards_count,
shards,
lengths,
full: AtomicBool::new(false),
capacity,
}
}
pub(super) async fn get<K: Into<CacheKey>>(&self, key: K) -> Option<V> {
let key = key.into();
// Locate the shard
let n = key as usize % self.shards_count;
// Get and promote the key
self.shards[n].lock().await.get_and_promote(key)
}
pub(super) async fn insert<K: Into<CacheKey>>(&self, key: K, val: V) {
let key = key.into();
// Locate the shard
let shard = key as usize % self.shards_count;
// Insert the key/object in the shard and get the new length
let new_length = self.shards[shard].lock().await.insert(key, val, self.full.load(Relaxed));
// Update lengths
self.check_length(new_length, shard);
}
pub(super) async fn remove<K: Into<CacheKey>>(&self, key: K) {
let key = key.into();
// Locate the shard
let shard = key as usize % self.shards_count;
// Remove the key
let new_length = self.shards[shard].lock().await.remove(key);
// Update lengths
self.check_length(new_length, shard);
}
fn check_length(&self, new_length: usize, shard: usize) {
// Set the length for this shard
self.lengths[shard].store(new_length, Relaxed);
// Compute the total length
let total_length: usize = self.lengths.iter().map(|l| l.load(Relaxed)).sum();
// Check if the cache is full
self.full.store(total_length == self.capacity, Relaxed);
}
#[cfg(test)]
pub(super) fn len(&self) -> usize {
self.lengths.iter().map(|l| l.load(Relaxed)).sum()
}
pub(super) async fn duplicate<F>(&self, filter: F) -> Self
where
F: Fn(&CacheKey) -> bool + Copy,
{
// We cant the shards to be copied concurrently.
// So we create one future per shard.
let futures: Vec<_> = self
.shards
.iter()
.map(|s| async {
let shard = s.lock().await.duplicate(filter);
(shard.map.len(), Mutex::new(shard))
})
.collect();
let mut shards = Vec::with_capacity(self.shards_count);
let mut lengths = Vec::with_capacity(self.shards_count);
let mut total_lengths = 0;
for (length, shard) in join_all(futures).await {
shards.push(shard);
total_lengths += length;
lengths.push(length.into());
}
Self {
shards,
lengths,
full: AtomicBool::new(total_lengths >= self.capacity),
shards_count: self.shards_count,
capacity: self.capacity,
}
}
}
struct LruShard<T>
where
T: Clone,
{
map: HashMap<CacheKey, usize>,
vec: Vec<Option<(CacheKey, T)>>,
}
impl<T> LruShard<T>
where
T: Clone,
{
fn new() -> Self {
Self {
map: HashMap::new(),
vec: Vec::new(),
}
}
fn get_and_promote(&mut self, key: CacheKey) -> Option<T> {
if let Some(pos) = self.map.get(&key).copied() {
let val = self.vec[pos].clone();
if pos > 0 {
self.promote(key, pos);
}
val.map(|(_, v)| v.clone())
} else {
None
}
}
// The promotion implements a very low-cost strategy.
// Promotion is done by flipping the entry with the entry just before.
// Each time an entry is hit, its weight increase.
fn promote(&mut self, key: CacheKey, pos: usize) {
// Promotion is flipping the current entry with the entry before
let new_pos = pos - 1;
let flip_key = self.vec[new_pos].as_ref().map(|(k, _)| k).cloned();
self.vec.swap(pos, new_pos);
self.map.insert(key, new_pos);
if let Some(flip_key) = flip_key {
self.map.insert(flip_key, pos);
} else if pos == self.vec.len() - 1 {
self.vec.remove(pos);
}
}
fn insert(&mut self, key: CacheKey, val: T, replace: bool) -> usize {
if let Some(pos) = self.map.get(&key).copied() {
// If the entry is already there, just update it
self.vec[pos] = Some((key, val));
} else {
// If we reached the capacity
if replace {
// Find the last entry...
while !self.vec.is_empty() {
if let Some(Some((k, _v))) = self.vec.pop() {
// ... and remove it
self.map.remove(&k);
break;
}
}
}
// Now we can insert the new entry
let pos = self.vec.len();
self.vec.push(Some((key, val)));
// If it is the head
if pos == 0 {
// ...we just insert it
self.map.insert(key, pos);
} else {
// or we promote it
self.promote(key, pos);
}
}
self.map.len()
}
fn remove(&mut self, key: CacheKey) -> usize {
if let Some(pos) = self.map.remove(&key) {
if pos == self.vec.len() - 1 {
// If it is the last element, we can just remove it from the vec
self.vec.pop();
} else {
// Otherwise we set a placeholder
self.vec[pos] = None;
}
}
self.map.len()
}
/// Make a copy of this cache containing every entry for which the specified filter returns true.
fn duplicate<F>(&self, filter: F) -> Self
where
F: Fn(&CacheKey) -> bool,
{
let mut map = HashMap::with_capacity(self.map.len());
let mut vec = Vec::with_capacity(self.vec.len());
self.map.iter().filter(|&(k, _pos)| filter(k)).for_each(|(k, pos)| {
let new_pos = vec.len();
map.insert(*k, new_pos);
vec.push(self.vec[*pos].clone());
});
Self {
map,
vec,
}
}
}
#[cfg(test)]
mod tests {
use super::ConcurrentLru;
use futures::future::join_all;
use test_log::test;
#[test(tokio::test)]
async fn test_minimal_tree_lru() {
let lru = ConcurrentLru::new(1);
assert_eq!(lru.len(), 0);
//
lru.insert(1u64, 'a').await;
assert_eq!(lru.len(), 1);
assert_eq!(lru.get(1u64).await, Some('a'));
//
lru.insert(2u64, 'b').await;
assert_eq!(lru.len(), 1);
assert_eq!(lru.get(1u64).await, None);
assert_eq!(lru.get(2u64).await, Some('b'));
//
lru.insert(2u64, 'c').await;
assert_eq!(lru.len(), 1);
assert_eq!(lru.get(2u64).await, Some('c'));
//
lru.remove(1u64).await;
assert_eq!(lru.len(), 1);
assert_eq!(lru.get(2u64).await, Some('c'));
//
lru.remove(2u64).await;
assert_eq!(lru.len(), 0);
assert_eq!(lru.get(1u64).await, None);
assert_eq!(lru.get(2u64).await, None);
}
#[test(tokio::test)]
async fn test_tree_lru() {
let lru = ConcurrentLru::new(4);
//
lru.insert(1u64, 'a').await;
lru.insert(2u64, 'b').await;
lru.insert(3u64, 'c').await;
lru.insert(4u64, 'd').await;
assert_eq!(lru.len(), 4);
assert_eq!(lru.get(1u64).await, Some('a'));
assert_eq!(lru.get(2u64).await, Some('b'));
assert_eq!(lru.get(3u64).await, Some('c'));
assert_eq!(lru.get(4u64).await, Some('d'));
//
lru.insert(5u64, 'e').await;
assert_eq!(lru.len(), 4);
assert_eq!(lru.get(1u64).await, None);
assert_eq!(lru.get(2u64).await, Some('b'));
assert_eq!(lru.get(3u64).await, Some('c'));
assert_eq!(lru.get(4u64).await, Some('d'));
assert_eq!(lru.get(5u64).await, Some('e'));
//
let lru = lru.duplicate(|k| *k != 3).await;
assert_eq!(lru.len(), 3);
assert_eq!(lru.get(1u64).await, None);
assert_eq!(lru.get(2u64).await, Some('b'));
assert_eq!(lru.get(3u64).await, None);
assert_eq!(lru.get(4u64).await, Some('d'));
assert_eq!(lru.get(5u64).await, Some('e'));
}
#[test(tokio::test(flavor = "multi_thread"))]
async fn concurrent_lru_test() {
let num_threads = 4;
let lru = ConcurrentLru::new(100);
let futures: Vec<_> = (0..num_threads)
.map(|_| async {
lru.insert(10u64, 'a').await;
lru.get(10u64).await;
lru.insert(20u64, 'b').await;
lru.remove(10u64).await;
})
.collect();
join_all(futures).await;
assert!(lru.get(10u64).await.is_none());
assert!(lru.get(20u64).await.is_some());
}
}

View file

@ -1,4 +1,5 @@
pub mod cache;
mod lru;
pub(crate) mod tree;
use crate::dbs::Options;
@ -16,6 +17,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
pub type NodeId = u64;
pub type StoreGeneration = u64;
#[non_exhaustive]
pub enum TreeStore<N>
@ -32,10 +34,10 @@ impl<N> TreeStore<N>
where
N: TreeNode + Debug + Display + Clone,
{
pub async fn new(keys: TreeNodeProvider, cache: TreeCache<N>, tt: TransactionType) -> Self {
pub async fn new(np: TreeNodeProvider, cache: Arc<TreeCache<N>>, tt: TransactionType) -> Self {
match tt {
TransactionType::Read => Self::Read(TreeRead::new(cache)),
TransactionType::Write => Self::Write(TreeWrite::new(keys, cache)),
TransactionType::Write => Self::Write(TreeWrite::new(np, cache)),
}
}
@ -45,7 +47,7 @@ where
node_id: NodeId,
) -> Result<StoredNode<N>, Error> {
match self {
TreeStore::Write(w) => w.get_node_mut(tx, node_id).await,
Self::Write(w) => w.get_node_mut(tx, node_id).await,
_ => Err(Error::Unreachable("TreeStore::get_node_mut")),
}
}
@ -56,7 +58,7 @@ where
node_id: NodeId,
) -> Result<Arc<StoredNode<N>>, Error> {
match self {
TreeStore::Read(r) => r.get_node(tx, node_id).await,
Self::Read(r) => r.get_node(tx, node_id).await,
_ => Err(Error::Unreachable("TreeStore::get_node")),
}
}
@ -67,14 +69,14 @@ where
updated: bool,
) -> Result<(), Error> {
match self {
TreeStore::Write(w) => w.set_node(node, updated),
Self::Write(w) => w.set_node(node, updated),
_ => Err(Error::Unreachable("TreeStore::set_node")),
}
}
pub(in crate::idx) fn new_node(&mut self, id: NodeId, node: N) -> Result<StoredNode<N>, Error> {
match self {
TreeStore::Write(w) => Ok(w.new_node(id, node)),
Self::Write(w) => Ok(w.new_node(id, node)),
_ => Err(Error::Unreachable("TreeStore::new_node")),
}
}
@ -85,15 +87,15 @@ where
node_key: Key,
) -> Result<(), Error> {
match self {
TreeStore::Write(w) => w.remove_node(node_id, node_key),
Self::Write(w) => w.remove_node(node_id, node_key),
_ => Err(Error::Unreachable("TreeStore::remove_node")),
}
}
pub async fn finish(&mut self, tx: &mut Transaction) -> Result<bool, Error> {
pub async fn finish(&mut self, tx: &mut Transaction) -> Result<Option<TreeCache<N>>, Error> {
match self {
TreeStore::Write(w) => w.finish(tx).await,
_ => Ok(false),
Self::Write(w) => w.finish(tx).await,
_ => Ok(None),
}
}
}
@ -110,7 +112,7 @@ pub enum TreeNodeProvider {
}
impl TreeNodeProvider {
pub(in crate::idx) fn get_key(&self, node_id: NodeId) -> Key {
pub fn get_key(&self, node_id: NodeId) -> Key {
match self {
TreeNodeProvider::DocIds(ikb) => ikb.new_bd_key(Some(node_id)),
TreeNodeProvider::DocLengths(ikb) => ikb.new_bl_key(Some(node_id)),
@ -135,17 +137,19 @@ impl TreeNodeProvider {
}
}
async fn save<N>(&self, tx: &mut Transaction, mut node: StoredNode<N>) -> Result<(), Error>
async fn save<N>(&self, tx: &mut Transaction, node: &mut StoredNode<N>) -> Result<(), Error>
where
N: TreeNode + Clone + Display,
{
let val = node.n.try_into_val()?;
tx.set(node.key, val).await?;
node.size = val.len() as u32;
tx.set(node.key.clone(), val).await?;
Ok(())
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct StoredNode<N>
where
N: Clone + Display,
@ -180,10 +184,11 @@ where
}
pub trait TreeNode: Debug + Clone + Display {
fn prepare_save(&mut self) {}
fn try_from_val(val: Val) -> Result<Self, Error>
where
Self: Sized;
fn try_into_val(&mut self) -> Result<Val, Error>;
fn try_into_val(&self) -> Result<Val, Error>;
}
#[derive(Clone)]
@ -206,10 +211,10 @@ impl Default for IndexStores {
}
impl IndexStores {
pub(in crate::idx) async fn get_store_btree_fst(
pub async fn get_store_btree_fst(
&self,
keys: TreeNodeProvider,
generation: u64,
generation: StoreGeneration,
tt: TransactionType,
cache_size: usize,
) -> BTreeStore<FstKeys> {
@ -217,10 +222,14 @@ impl IndexStores {
TreeStore::new(keys, cache, tt).await
}
pub(in crate::idx) async fn get_store_btree_trie(
pub fn advance_store_btree_fst(&self, new_cache: TreeCache<BTreeNode<FstKeys>>) {
self.0.btree_fst_caches.new_cache(new_cache);
}
pub async fn get_store_btree_trie(
&self,
keys: TreeNodeProvider,
generation: u64,
generation: StoreGeneration,
tt: TransactionType,
cache_size: usize,
) -> BTreeStore<TrieKeys> {
@ -228,10 +237,14 @@ impl IndexStores {
TreeStore::new(keys, cache, tt).await
}
pub(in crate::idx) async fn get_store_mtree(
pub fn advance_cache_btree_trie(&self, new_cache: TreeCache<BTreeNode<TrieKeys>>) {
self.0.btree_trie_caches.new_cache(new_cache);
}
pub async fn get_store_mtree(
&self,
keys: TreeNodeProvider,
generation: u64,
generation: StoreGeneration,
tt: TransactionType,
cache_size: usize,
) -> MTreeStore {
@ -239,6 +252,10 @@ impl IndexStores {
TreeStore::new(keys, cache, tt).await
}
pub fn advance_store_mtree(&self, new_cache: TreeCache<MTreeNode>) {
self.0.mtree_caches.new_cache(new_cache);
}
pub(crate) async fn index_removed(
&self,
opt: &Options,
@ -250,7 +267,6 @@ impl IndexStores {
opt,
tx.get_and_cache_tb_index(opt.ns(), opt.db(), tb, ix).await?.as_ref(),
)
.await
}
pub(crate) async fn namespace_removed(
@ -271,40 +287,40 @@ impl IndexStores {
tb: &str,
) -> Result<(), Error> {
for ix in tx.all_tb_indexes(opt.ns(), opt.db(), tb).await?.iter() {
self.remove_index(opt, ix).await?;
self.remove_index(opt, ix)?;
}
Ok(())
}
async fn remove_index(&self, opt: &Options, ix: &DefineIndexStatement) -> Result<(), Error> {
fn remove_index(&self, opt: &Options, ix: &DefineIndexStatement) -> Result<(), Error> {
let ikb = IndexKeyBase::new(opt, ix);
match ix.index {
Index::Search(_) => {
self.remove_search_cache(ikb).await;
self.remove_search_caches(ikb);
}
Index::MTree(_) => {
self.remove_mtree_cache(ikb).await;
self.remove_mtree_caches(ikb);
}
_ => {}
}
Ok(())
}
async fn remove_search_cache(&self, ikb: IndexKeyBase) {
self.0.btree_trie_caches.remove_cache(&TreeNodeProvider::DocIds(ikb.clone())).await;
self.0.btree_trie_caches.remove_cache(&TreeNodeProvider::DocLengths(ikb.clone())).await;
self.0.btree_trie_caches.remove_cache(&TreeNodeProvider::Postings(ikb.clone())).await;
self.0.btree_fst_caches.remove_cache(&TreeNodeProvider::Terms(ikb)).await;
fn remove_search_caches(&self, ikb: IndexKeyBase) {
self.0.btree_trie_caches.remove_caches(&TreeNodeProvider::DocIds(ikb.clone()));
self.0.btree_trie_caches.remove_caches(&TreeNodeProvider::DocLengths(ikb.clone()));
self.0.btree_trie_caches.remove_caches(&TreeNodeProvider::Postings(ikb.clone()));
self.0.btree_fst_caches.remove_caches(&TreeNodeProvider::Terms(ikb));
}
async fn remove_mtree_cache(&self, ikb: IndexKeyBase) {
self.0.btree_trie_caches.remove_cache(&TreeNodeProvider::DocIds(ikb.clone())).await;
self.0.mtree_caches.remove_cache(&TreeNodeProvider::Vector(ikb.clone())).await;
fn remove_mtree_caches(&self, ikb: IndexKeyBase) {
self.0.btree_trie_caches.remove_caches(&TreeNodeProvider::DocIds(ikb.clone()));
self.0.mtree_caches.remove_caches(&TreeNodeProvider::Vector(ikb.clone()));
}
pub async fn is_empty(&self) -> bool {
self.0.mtree_caches.is_empty().await
&& self.0.btree_fst_caches.is_empty().await
&& self.0.btree_trie_caches.is_empty().await
pub fn is_empty(&self) -> bool {
self.0.mtree_caches.is_empty()
&& self.0.btree_fst_caches.is_empty()
&& self.0.btree_trie_caches.is_empty()
}
}

View file

@ -4,6 +4,7 @@ use crate::idx::trees::store::{NodeId, StoredNode, TreeNode, TreeNodeProvider};
use crate::kvs::{Key, Transaction};
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::mem;
use std::sync::Arc;
#[non_exhaustive]
@ -12,7 +13,8 @@ where
N: TreeNode + Debug + Clone,
{
np: TreeNodeProvider,
cache: TreeCache<N>,
cache: Arc<TreeCache<N>>,
cached: HashSet<NodeId>,
nodes: HashMap<NodeId, StoredNode<N>>,
updated: HashSet<NodeId>,
removed: HashMap<NodeId, Key>,
@ -24,10 +26,11 @@ impl<N> TreeWrite<N>
where
N: TreeNode + Clone + Debug + Display,
{
pub(super) fn new(keys: TreeNodeProvider, cache: TreeCache<N>) -> Self {
pub(super) fn new(np: TreeNodeProvider, cache: Arc<TreeCache<N>>) -> Self {
Self {
np: keys,
np,
cache,
cached: HashSet::new(),
nodes: HashMap::new(),
updated: HashSet::new(),
removed: HashMap::new(),
@ -43,33 +46,26 @@ where
) -> Result<StoredNode<N>, Error> {
#[cfg(debug_assertions)]
{
debug!("GET: {}", node_id);
self.out.insert(node_id);
if self.removed.contains_key(&node_id) {
return Err(Error::Unreachable("TreeTransactionWrite::get_node_mut"));
}
}
if let Some(n) = self.nodes.remove(&node_id) {
#[cfg(debug_assertions)]
debug!("GET (NODES): {}", n.n);
return Ok(n);
}
let r = self.cache.get_node(tx, node_id).await?;
#[cfg(debug_assertions)]
debug!("GET (CACHE): {}", r.n);
self.cached.insert(node_id);
Ok(StoredNode::new(r.n.clone(), r.id, r.key.clone(), r.size))
}
pub(super) fn set_node(&mut self, node: StoredNode<N>, updated: bool) -> Result<(), Error> {
#[cfg(debug_assertions)]
{
if updated {
debug!("SET {updated}: {node}");
}
self.out.remove(&node.id);
}
self.out.remove(&node.id);
if updated {
self.updated.insert(node.id);
self.cached.remove(&node.id);
}
if self.removed.contains_key(&node.id) {
return Err(Error::Unreachable("TreeTransactionWrite::set_node(2)"));
@ -80,56 +76,59 @@ where
pub(super) fn new_node(&mut self, id: NodeId, node: N) -> StoredNode<N> {
#[cfg(debug_assertions)]
{
debug!("NEW: {}", id);
self.out.insert(id);
}
self.out.insert(id);
StoredNode::new(node, id, self.np.get_key(id), 0)
}
pub(super) fn remove_node(&mut self, node_id: NodeId, node_key: Key) -> Result<(), Error> {
#[cfg(debug_assertions)]
{
debug!("REMOVE: {}", node_id);
if self.nodes.contains_key(&node_id) {
return Err(Error::Unreachable("TreeTransactionWrite::remove_node"));
}
self.out.remove(&node_id);
}
self.updated.remove(&node_id);
self.cached.remove(&node_id);
self.removed.insert(node_id, node_key);
Ok(())
}
pub(super) async fn finish(&mut self, tx: &mut Transaction) -> Result<bool, Error> {
let update = !self.updated.is_empty() || !self.removed.is_empty();
pub(super) async fn finish(
&mut self,
tx: &mut Transaction,
) -> Result<Option<TreeCache<N>>, Error> {
#[cfg(debug_assertions)]
{
debug!("finish");
if !self.out.is_empty() {
debug!("OUT: {:?}", self.out);
return Err(Error::Unreachable("TreeTransactionWrite::finish(1)"));
}
}
for node_id in &self.updated {
if let Some(node) = self.nodes.remove(node_id) {
#[cfg(debug_assertions)]
debug!("finish: tx.save {node_id}");
self.np.save(tx, node).await?;
if self.updated.is_empty() && self.removed.is_empty() {
return Ok(None);
}
// Create a new cache hydrated with non-updated and non-removed previous cache entries.
let new_cache = self.cache.next_generation(&self.updated, &self.removed).await;
let updated = mem::take(&mut self.updated);
for node_id in updated {
if let Some(mut node) = self.nodes.remove(&node_id) {
node.n.prepare_save();
self.np.save(tx, &mut node).await?;
// Update the cache with updated entries.
new_cache.set_node(node).await;
} else {
return Err(Error::Unreachable("TreeTransactionWrite::finish(2)"));
}
}
self.updated.clear();
let node_ids: Vec<NodeId> = self.removed.keys().copied().collect();
for node_id in node_ids {
if let Some(node_key) = self.removed.remove(&node_id) {
#[cfg(debug_assertions)]
debug!("finish: tx.del {node_id}");
tx.del(node_key).await?;
}
let removed = mem::take(&mut self.removed);
for (node_id, node_key) in removed {
tx.del(node_key).await?;
new_cache.remove_node(&node_id).await;
}
Ok(update)
Ok(Some(new_cache))
}
}
@ -153,14 +152,14 @@ pub struct TreeRead<N>
where
N: TreeNode + Debug + Clone,
{
cache: TreeCache<N>,
cache: Arc<TreeCache<N>>,
}
impl<N> TreeRead<N>
where
N: TreeNode + Debug + Clone,
{
pub(super) fn new(cache: TreeCache<N>) -> Self {
pub(super) fn new(cache: Arc<TreeCache<N>>) -> Self {
Self {
cache,
}

View file

@ -1,11 +1,16 @@
use crate::err::Error;
use crate::fnc::util::math::deviation::deviation;
use crate::fnc::util::math::mean::Mean;
use crate::fnc::util::math::ToFloat;
use crate::sql::index::VectorType;
use crate::sql::Number;
use crate::sql::index::{Distance, VectorType};
use crate::sql::{Array, Number, Value};
use revision::revisioned;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::ops::Mul;
use std::collections::HashSet;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::ops::{Mul, Sub};
use std::sync::Arc;
/// In the context of a Symmetric MTree index, the term object refers to a vector, representing the indexed item.
@ -24,7 +29,96 @@ pub enum Vector {
/// So the requirement is multiple ownership but not thread safety.
/// However, because we are running in an async context, and because we are using cache structures that use the Arc as a key,
/// the cached objects has to be Sent, which then requires the use of Arc (rather than just Rc).
pub(crate) type SharedVector = Arc<Vector>;
/// As computing the hash for a large vector is costly, this structures also caches the hashcode to avoid recomputing it.
#[derive(Debug, Clone)]
pub struct SharedVector(Arc<Vector>, u64);
impl From<Vector> for SharedVector {
fn from(v: Vector) -> Self {
let mut h = DefaultHasher::new();
v.hash(&mut h);
Self(v.into(), h.finish())
}
}
impl Borrow<Vector> for &SharedVector {
fn borrow(&self) -> &Vector {
self.0.as_ref()
}
}
impl Hash for SharedVector {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.1);
}
}
impl PartialEq for SharedVector {
fn eq(&self, other: &Self) -> bool {
self.1 == other.1 && self.0 == other.0
}
}
impl Eq for SharedVector {}
impl PartialOrd for SharedVector {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SharedVector {
fn cmp(&self, other: &Self) -> Ordering {
self.0.as_ref().cmp(other.0.as_ref())
}
}
impl Serialize for SharedVector {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// We only serialize the vector part, not the u64
self.0.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for SharedVector {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// We deserialize into a vector and construct the struct
// assuming some default or dummy value for the u64, e.g., 0
let v = Vector::deserialize(deserializer)?;
Ok(v.into())
}
}
impl Hash for Vector {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
Vector::F64(v) => {
let h = v.iter().fold(0, |acc, &x| acc ^ x.to_bits());
state.write_u64(h);
}
Vector::F32(v) => {
let h = v.iter().fold(0, |acc, &x| acc ^ x.to_bits());
state.write_u32(h);
}
Vector::I64(v) => {
let h = v.iter().fold(0, |acc, &x| acc ^ x);
state.write_i64(h);
}
Vector::I32(v) => {
let h = v.iter().fold(0, |acc, &x| acc ^ x);
state.write_i32(h);
}
Vector::I16(v) => {
let h = v.iter().fold(0, |acc, &x| acc ^ x);
state.write_i16(h);
}
}
}
}
impl PartialEq for Vector {
fn eq(&self, other: &Self) -> bool {
@ -70,41 +164,78 @@ impl Ord for Vector {
}
impl Vector {
pub(super) fn new(t: VectorType, l: usize) -> Self {
pub(super) fn new(t: VectorType, d: usize) -> Self {
match t {
VectorType::F64 => Self::F64(Vec::with_capacity(l)),
VectorType::F32 => Self::F32(Vec::with_capacity(l)),
VectorType::I64 => Self::I64(Vec::with_capacity(l)),
VectorType::I32 => Self::I32(Vec::with_capacity(l)),
VectorType::I16 => Self::I16(Vec::with_capacity(l)),
VectorType::F64 => Self::F64(Vec::with_capacity(d)),
VectorType::F32 => Self::F32(Vec::with_capacity(d)),
VectorType::I64 => Self::I64(Vec::with_capacity(d)),
VectorType::I32 => Self::I32(Vec::with_capacity(d)),
VectorType::I16 => Self::I16(Vec::with_capacity(d)),
}
}
pub(super) fn add(&mut self, n: Number) {
pub(super) fn try_from_value(t: VectorType, d: usize, v: &Value) -> Result<Self, Error> {
let mut vec = Vector::new(t, d);
vec.check_vector_value(v)?;
Ok(vec)
}
fn check_vector_value(&mut self, value: &Value) -> Result<(), Error> {
match value {
Value::Array(a) => {
for v in a.0.iter() {
self.check_vector_value(v)?;
}
Ok(())
}
Value::Number(n) => {
self.add(n);
Ok(())
}
_ => Err(Error::InvalidVectorValue(value.clone().to_raw_string())),
}
}
pub fn try_from_array(t: VectorType, a: &Array) -> Result<Self, Error> {
let mut vec = Vector::new(t, a.len());
for v in &a.0 {
if let Value::Number(n) = v {
vec.add(n);
} else {
return Err(Error::InvalidVectorType {
current: v.clone().to_string(),
expected: "Number",
});
}
}
Ok(vec)
}
pub(super) fn add(&mut self, n: &Number) {
match self {
Vector::F64(v) => v.push(n.to_float()),
Vector::F32(v) => v.push(n.to_float() as f32),
Vector::I64(v) => v.push(n.to_int()),
Vector::I32(v) => v.push(n.to_int() as i32),
Vector::I16(v) => v.push(n.to_int() as i16),
Self::F64(v) => v.push(n.to_float()),
Self::F32(v) => v.push(n.to_float() as f32),
Self::I64(v) => v.push(n.to_int()),
Self::I32(v) => v.push(n.to_int() as i32),
Self::I16(v) => v.push(n.to_int() as i16),
};
}
pub(super) fn len(&self) -> usize {
match self {
Vector::F64(v) => v.len(),
Vector::F32(v) => v.len(),
Vector::I64(v) => v.len(),
Vector::I32(v) => v.len(),
Vector::I16(v) => v.len(),
Self::F64(v) => v.len(),
Self::F32(v) => v.len(),
Self::I64(v) => v.len(),
Self::I32(v) => v.len(),
Self::I16(v) => v.len(),
}
}
fn check_same_dimension(fnc: &str, a: &Vector, b: &Vector) -> Result<(), Error> {
if a.len() != b.len() {
Err(Error::InvalidArguments {
name: String::from(fnc),
message: String::from("The two vectors must be of the same dimension."),
pub(super) fn check_expected_dimension(current: usize, expected: usize) -> Result<(), Error> {
if current != expected {
Err(Error::InvalidVectorDimension {
current,
expected,
})
} else {
Ok(())
@ -165,82 +296,287 @@ impl Vector {
}
}
pub(super) fn euclidean_distance(&self, other: &Self) -> Result<f64, Error> {
Self::check_same_dimension("vector::distance::euclidean", self, other)?;
pub(super) fn check_dimension(&self, expected_dim: usize) -> Result<(), Error> {
Self::check_expected_dimension(self.len(), expected_dim)
}
fn chebyshev<T>(a: &[T], b: &[T]) -> f64
where
T: ToFloat,
{
a.iter()
.zip(b.iter())
.map(|(a, b)| (a.to_float() - b.to_float()).abs())
.fold(f64::MIN, f64::max)
}
pub(crate) fn chebyshev_distance(&self, other: &Self) -> f64 {
match (self, other) {
(Vector::F64(a), Vector::F64(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (a - b).powi(2)).sum::<f64>().sqrt())
}
(Vector::F32(a), Vector::F32(b)) => Ok(a
.iter()
.zip(b.iter())
.map(|(a, b)| (*a as f64 - *b as f64).powi(2))
.sum::<f64>()
.sqrt()),
(Vector::I64(a), Vector::I64(b)) => {
Ok((a.iter().zip(b.iter()).map(|(a, b)| (a - b).pow(2)).sum::<i64>() as f64).sqrt())
}
(Vector::I32(a), Vector::I32(b)) => {
Ok((a.iter().zip(b.iter()).map(|(a, b)| (a - b).pow(2)).sum::<i32>() as f64).sqrt())
}
(Vector::I16(a), Vector::I16(b)) => {
Ok((a.iter().zip(b.iter()).map(|(a, b)| (a - b).pow(2)).sum::<i16>() as f64).sqrt())
}
_ => Err(Error::Unreachable("Vector::euclidean_distance")),
(Self::F64(a), Self::F64(b)) => Self::chebyshev(a, b),
(Self::F32(a), Self::F32(b)) => Self::chebyshev(a, b),
(Self::I64(a), Self::I64(b)) => Self::chebyshev(a, b),
(Self::I32(a), Self::I32(b)) => Self::chebyshev(a, b),
(Self::I16(a), Self::I16(b)) => Self::chebyshev(a, b),
_ => f64::NAN,
}
}
pub(super) fn manhattan_distance(&self, other: &Self) -> Result<f64, Error> {
Self::check_same_dimension("vector::distance::manhattan", self, other)?;
fn euclidean<T>(a: &[T], b: &[T]) -> f64
where
T: ToFloat,
{
a.iter()
.zip(b.iter())
.map(|(a, b)| (a.to_float() - b.to_float()).powi(2))
.sum::<f64>()
.sqrt()
}
pub(crate) fn euclidean_distance(&self, other: &Self) -> f64 {
match (self, other) {
(Vector::F64(a), Vector::F64(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (a - b).abs()).sum())
}
(Vector::F32(a), Vector::F32(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (*a as f64 - *b as f64).abs()).sum::<f64>())
}
(Vector::I64(a), Vector::I64(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (a - b).abs()).sum::<i64>() as f64)
}
(Vector::I32(a), Vector::I32(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (a - b).abs()).sum::<i32>() as f64)
}
(Vector::I16(a), Vector::I16(b)) => {
Ok(a.iter().zip(b.iter()).map(|(a, b)| (a - b).abs()).sum::<i16>() as f64)
}
_ => Err(Error::Unreachable("Vector::manhattan_distance")),
(Self::F64(a), Self::F64(b)) => Self::euclidean(a, b),
(Self::F32(a), Self::F32(b)) => Self::euclidean(a, b),
(Self::I64(a), Self::I64(b)) => Self::euclidean(a, b),
(Self::I32(a), Self::I32(b)) => Self::euclidean(a, b),
(Self::I16(a), Self::I16(b)) => Self::euclidean(a, b),
_ => f64::INFINITY,
}
}
pub(super) fn minkowski_distance(&self, other: &Self, order: &Number) -> Result<f64, Error> {
Self::check_same_dimension("vector::distance::minkowski", self, other)?;
let dist = match (self, other) {
(Vector::F64(a), Vector::F64(b)) => a
.iter()
.zip(b.iter())
.map(|(a, b)| (a - b).abs().powf(order.to_float()))
.sum::<f64>(),
(Vector::F32(a), Vector::F32(b)) => a
.iter()
.zip(b.iter())
.map(|(a, b)| (a - b).abs().powf(order.to_float() as f32))
.sum::<f32>() as f64,
(Vector::I64(a), Vector::I64(b)) => a
.iter()
.zip(b.iter())
.map(|(a, b)| (a - b).abs().pow(order.to_int() as u32))
.sum::<i64>() as f64,
(Vector::I32(a), Vector::I32(b)) => a
.iter()
.zip(b.iter())
.map(|(a, b)| (a - b).abs().pow(order.to_int() as u32))
.sum::<i32>() as f64,
(Vector::I16(a), Vector::I16(b)) => a
.iter()
.zip(b.iter())
.map(|(a, b)| (a - b).abs().pow(order.to_int() as u32))
.sum::<i16>() as f64,
_ => return Err(Error::Unreachable("Vector::minkowski_distance")),
};
Ok(dist.powf(1.0 / order.to_float()))
fn hamming<T>(a: &[T], b: &[T]) -> f64
where
T: PartialEq,
{
a.iter().zip(b.iter()).filter(|&(a, b)| a != b).count() as f64
}
pub(crate) fn hamming_distance(&self, other: &Self) -> f64 {
match (self, other) {
(Self::F64(a), Self::F64(b)) => Self::hamming(a, b),
(Self::F32(a), Self::F32(b)) => Self::hamming(a, b),
(Self::I64(a), Self::I64(b)) => Self::hamming(a, b),
(Self::I32(a), Self::I32(b)) => Self::hamming(a, b),
(Self::I16(a), Self::I16(b)) => Self::hamming(a, b),
_ => f64::NAN,
}
}
fn jaccard_f64(a: &[f64], b: &[f64]) -> f64 {
let mut union: HashSet<u64> = HashSet::from_iter(a.iter().map(|f| f.to_bits()));
let intersection_size = b.iter().filter(|n| !union.insert(n.to_bits())).count() as f64;
intersection_size / union.len() as f64
}
fn jaccard_f32(a: &[f32], b: &[f32]) -> f64 {
let mut union: HashSet<u32> = HashSet::from_iter(a.iter().map(|f| f.to_bits()));
let intersection_size = b.iter().filter(|n| !union.insert(n.to_bits())).count() as f64;
intersection_size / union.len() as f64
}
fn jaccard_integers<T>(a: &[T], b: &[T]) -> f64
where
T: Eq + Hash,
{
let mut union: HashSet<&T> = HashSet::from_iter(a.iter());
let intersection_size = b.iter().filter(|n| !union.insert(n)).count() as f64;
intersection_size / union.len() as f64
}
pub(crate) fn jaccard_similarity(&self, other: &Self) -> f64 {
match (self, other) {
(Self::F64(a), Self::F64(b)) => Self::jaccard_f64(a, b),
(Self::F32(a), Self::F32(b)) => Self::jaccard_f32(a, b),
(Self::I64(a), Self::I64(b)) => Self::jaccard_integers(a, b),
(Self::I32(a), Self::I32(b)) => Self::jaccard_integers(a, b),
(Self::I16(a), Self::I16(b)) => Self::jaccard_integers(a, b),
_ => f64::NAN,
}
}
fn manhattan<T>(a: &[T], b: &[T]) -> f64
where
T: Sub<Output = T> + ToFloat + Copy,
{
a.iter().zip(b.iter()).map(|(&a, &b)| ((a - b).to_float()).abs()).sum()
}
pub(crate) fn manhattan_distance(&self, other: &Self) -> f64 {
match (self, other) {
(Self::F64(a), Self::F64(b)) => Self::manhattan(a, b),
(Self::F32(a), Self::F32(b)) => Self::manhattan(a, b),
(Self::I64(a), Self::I64(b)) => Self::manhattan(a, b),
(Self::I32(a), Self::I32(b)) => Self::manhattan(a, b),
(Self::I16(a), Self::I16(b)) => Self::manhattan(a, b),
_ => f64::NAN,
}
}
fn minkowski<T>(a: &[T], b: &[T], order: f64) -> f64
where
T: ToFloat,
{
let dist: f64 = a
.iter()
.zip(b.iter())
.map(|(a, b)| (a.to_float() - b.to_float()).abs().powf(order))
.sum();
dist.powf(1.0 / order)
}
pub(crate) fn minkowski_distance(&self, other: &Self, order: f64) -> f64 {
match (self, other) {
(Self::F64(a), Self::F64(b)) => Self::minkowski(a, b, order),
(Self::F32(a), Self::F32(b)) => Self::minkowski(a, b, order),
(Self::I64(a), Self::I64(b)) => Self::minkowski(a, b, order),
(Self::I32(a), Self::I32(b)) => Self::minkowski(a, b, order),
(Self::I16(a), Self::I16(b)) => Self::minkowski(a, b, order),
_ => f64::NAN,
}
}
fn pearson<T>(a: &[T], b: &[T]) -> f64
where
T: ToFloat,
{
let m1 = a.mean();
let m2 = b.mean();
let covar: f64 =
a.iter().zip(b.iter()).map(|(x, y)| (x.to_float() - m1) * (y.to_float() - m2)).sum();
let covar = covar / a.len() as f64;
let std_dev1 = deviation(a, m1, false);
let std_dev2 = deviation(b, m2, false);
covar / (std_dev1 * std_dev2)
}
fn pearson_similarity(&self, other: &Self) -> f64 {
match (self, other) {
(Self::F64(a), Self::F64(b)) => Self::pearson(a, b),
(Self::F32(a), Self::F32(b)) => Self::pearson(a, b),
(Self::I64(a), Self::I64(b)) => Self::pearson(a, b),
(Self::I32(a), Self::I32(b)) => Self::pearson(a, b),
(Self::I16(a), Self::I16(b)) => Self::pearson(a, b),
_ => f64::NAN,
}
}
}
impl Distance {
pub(super) fn calculate<V>(&self, a: V, b: V) -> f64
where
V: Borrow<Vector>,
{
match self {
Distance::Chebyshev => a.borrow().chebyshev_distance(b.borrow()),
Distance::Cosine => a.borrow().cosine_distance(b.borrow()),
Distance::Euclidean => a.borrow().euclidean_distance(b.borrow()),
Distance::Hamming => a.borrow().hamming_distance(b.borrow()),
Distance::Jaccard => a.borrow().jaccard_similarity(b.borrow()),
Distance::Manhattan => a.borrow().manhattan_distance(b.borrow()),
Distance::Minkowski(order) => {
a.borrow().minkowski_distance(b.borrow(), order.to_float())
}
Distance::Pearson => a.borrow().pearson_similarity(b.borrow()),
}
}
}
#[cfg(test)]
mod tests {
use crate::idx::trees::knn::tests::{get_seed_rnd, new_random_vec, RandomItemGenerator};
use crate::idx::trees::vector::Vector;
use crate::sql::index::{Distance, VectorType};
use crate::sql::Array;
fn test_distance(dist: Distance, a1: &[f64], a2: &[f64], res: f64) {
// Convert the arrays to Vec<Number>
let mut v1 = vec![];
a1.iter().for_each(|&n| v1.push(n.into()));
let mut v2 = vec![];
a2.iter().for_each(|&n| v2.push(n.into()));
// Check the generic distance implementation
assert_eq!(dist.compute(&v1, &v2).unwrap(), res.into());
// Check the "Vector" optimised implementations
for t in [VectorType::F64] {
let v1 = Vector::try_from_array(t, &Array::from(v1.clone())).unwrap();
let v2 = Vector::try_from_array(t, &Array::from(v2.clone())).unwrap();
assert_eq!(dist.calculate(&v1, &v2), res);
}
}
fn test_distance_collection(dist: Distance, size: usize, dim: usize) {
let mut rng = get_seed_rnd();
for vt in
[VectorType::F64, VectorType::F32, VectorType::I64, VectorType::I32, VectorType::I16]
{
let gen = RandomItemGenerator::new(&dist, dim);
let mut num_zero = 0;
for i in 0..size {
let v1 = new_random_vec(&mut rng, vt, dim, &gen);
let v2 = new_random_vec(&mut rng, vt, dim, &gen);
let d = dist.calculate(&v1, &v2);
assert!(
d.is_finite() && !d.is_nan(),
"i: {i} - vt: {vt} - v1: {v1:?} - v2: {v2:?}"
);
assert_ne!(d, f64::NAN, "i: {i} - vt: {vt} - v1: {v1:?} - v2: {v2:?}");
assert_ne!(d, f64::INFINITY, "i: {i} - vt: {vt} - v1: {v1:?} - v2: {v2:?}");
if d == 0.0 {
num_zero += 1;
}
}
let zero_rate = num_zero as f64 / size as f64;
assert!(zero_rate < 0.1, "vt: {vt} - zero_rate: {zero_rate}");
}
}
#[test]
fn test_distance_chebyshev() {
test_distance_collection(Distance::Chebyshev, 2000, 1536);
test_distance(Distance::Chebyshev, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 1.0);
}
#[test]
fn test_distance_cosine() {
test_distance_collection(Distance::Cosine, 2000, 1536);
test_distance(Distance::Cosine, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 0.007416666029069652);
}
#[test]
fn test_distance_euclidean() {
test_distance_collection(Distance::Euclidean, 2000, 1536);
test_distance(Distance::Euclidean, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 1.7320508075688772);
}
#[test]
fn test_distance_hamming() {
test_distance_collection(Distance::Hamming, 2000, 1536);
test_distance(Distance::Hamming, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 3.0);
}
#[test]
fn test_distance_jaccard() {
test_distance_collection(Distance::Jaccard, 1000, 768);
test_distance(Distance::Jaccard, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 0.5);
}
#[test]
fn test_distance_manhattan() {
test_distance_collection(Distance::Manhattan, 2000, 1536);
test_distance(Distance::Manhattan, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 3.0);
}
#[test]
fn test_distance_minkowski() {
test_distance_collection(Distance::Minkowski(3.into()), 2000, 1536);
test_distance(
Distance::Minkowski(3.into()),
&[1.0, 2.0, 3.0],
&[2.0, 3.0, 4.0],
1.4422495703074083,
);
}
#[test]
fn test_distance_pearson() {
test_distance_collection(Distance::Pearson, 2000, 1536);
test_distance(Distance::Pearson, &[1.0, 2.0, 3.0], &[2.0, 3.0, 4.0], 1.0);
}
}

View file

@ -1,6 +1,6 @@
use crate::err::Error;
use crate::fnc::util::math::vector::{
ChebyshevDistance, CosineSimilarity, EuclideanDistance, HammingDistance, JaccardSimilarity,
ChebyshevDistance, CosineDistance, EuclideanDistance, HammingDistance, JaccardSimilarity,
ManhattanDistance, MinkowskiDistance, PearsonSimilarity,
};
use crate::sql::ident::Ident;
@ -118,7 +118,7 @@ pub enum Distance {
impl Distance {
pub(crate) fn compute(&self, v1: &Vec<Number>, v2: &Vec<Number>) -> Result<Number, Error> {
match self {
Self::Cosine => v1.cosine_similarity(v2),
Self::Cosine => v1.cosine_distance(v2),
Self::Chebyshev => v1.chebyshev_distance(v2),
Self::Euclidean => v1.euclidean_distance(v2),
Self::Hamming => v1.hamming_distance(v2),

View file

@ -1,5 +1,6 @@
use super::value::{TryAdd, TryDiv, TryMul, TryNeg, TryPow, TryRem, TrySub};
use crate::err::Error;
use crate::fnc::util::math::ToFloat;
use crate::sql::strand::Strand;
use revision::revisioned;
use rust_decimal::prelude::*;
@ -735,3 +736,9 @@ impl Sort for Vec<Number> {
Sorted(self)
}
}
impl ToFloat for Number {
fn to_float(&self) -> f64 {
self.to_float()
}
}

View file

@ -1,6 +1,5 @@
use once_cell::sync::Lazy;
use quick_cache::sync::Cache;
use quick_cache::GuardResult;
use quick_cache::sync::{Cache, GuardResult};
use revision::revisioned;
use serde::{
de::{self, Visitor},

View file

@ -1,3 +1,4 @@
use crate::syn::token::VectorTypeKind;
use crate::{
sql::change_feed_include::ChangeFeedInclude,
sql::{language::Language, Algorithm},
@ -342,6 +343,13 @@ pub(crate) static KEYWORDS: phf::Map<UniCase<&'static str>, TokenKind> = phf_map
UniCase::ascii("MINKOWSKI") => TokenKind::Distance(DistanceKind::Minkowski),
UniCase::ascii("PEARSON") => TokenKind::Distance(DistanceKind::Pearson),
// VectorTypes
UniCase::ascii("F64") => TokenKind::VectorType(VectorTypeKind::F64),
UniCase::ascii("F32") => TokenKind::VectorType(VectorTypeKind::F32),
UniCase::ascii("I64") => TokenKind::VectorType(VectorTypeKind::I64),
UniCase::ascii("I32") => TokenKind::VectorType(VectorTypeKind::I32),
UniCase::ascii("I16") => TokenKind::VectorType(VectorTypeKind::I16),
// Change Feed keywords
UniCase::ascii("ORIGINAL") => TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original),
};

View file

@ -21,7 +21,8 @@ impl TokenValue for Ident {
TokenKind::Keyword(_)
| TokenKind::Language(_)
| TokenKind::Algorithm(_)
| TokenKind::Distance(_) => {
| TokenKind::Distance(_)
| TokenKind::VectorType(_) => {
let str = parser.lexer.reader.span(token.span);
// Lexer should ensure that the token is valid utf-8
let str = std::str::from_utf8(str).unwrap().to_owned();

View file

@ -653,7 +653,8 @@ impl Parser<'_> {
TokenKind::Keyword(_)
| TokenKind::Language(_)
| TokenKind::Algorithm(_)
| TokenKind::Distance(_) => {
| TokenKind::Distance(_)
| TokenKind::VectorType(_) => {
self.pop_peek();
let str = self.lexer.reader.span(token.span);
// Lexer should ensure that the token is valid utf-8

View file

@ -639,31 +639,41 @@ impl Parser<'_> {
self.pop_peek();
expected!(self, t!("DIMENSION"));
let dimension = self.next_token_value()?;
let distance = self.try_parse_distance()?.unwrap_or(Distance::Euclidean);
let capacity = self
.eat(t!("CAPACITY"))
.then(|| self.next_token_value())
.transpose()?
.unwrap_or(40);
let doc_ids_order = self
.eat(t!("DOC_IDS_ORDER"))
.then(|| self.next_token_value())
.transpose()?
.unwrap_or(100);
let doc_ids_cache = self
.eat(t!("DOC_IDS_CACHE"))
.then(|| self.next_token_value())
.transpose()?
.unwrap_or(100);
let mtree_cache = self
.eat(t!("MTREE_CACHE"))
.then(|| self.next_token_value())
.transpose()?
.unwrap_or(100);
let mut distance = Distance::Euclidean;
let mut vector_type = VectorType::F64;
let mut capacity = 40;
let mut doc_ids_cache = 100;
let mut doc_ids_order = 100;
let mut mtree_cache = 100;
loop {
match self.peek_kind() {
t!("DISTANCE") => {
self.pop_peek();
distance = self.parse_distance()?
}
t!("TYPE") => {
self.pop_peek();
vector_type = self.parse_vector_type()?
}
t!("CAPACITY") => {
self.pop_peek();
capacity = self.next_token_value()?
}
t!("DOC_IDS_CACHE") => {
self.pop_peek();
doc_ids_cache = self.next_token_value()?
}
t!("DOC_IDS_ORDER") => {
self.pop_peek();
doc_ids_order = self.next_token_value()?
}
t!("MTREE_CACHE") => {
self.pop_peek();
mtree_cache = self.next_token_value()?
}
_ => break,
}
}
res.index = Index::MTree(crate::sql::index::MTreeParams {
dimension,
_distance: Default::default(),
@ -672,7 +682,7 @@ impl Parser<'_> {
doc_ids_order,
doc_ids_cache,
mtree_cache,
vector_type: VectorType::F64,
vector_type,
})
}
t!("COMMENT") => {

View file

@ -2,6 +2,8 @@
use reblessive::Stk;
use crate::sql::index::VectorType;
use crate::syn::token::VectorTypeKind;
use crate::{
sql::{
change_feed_include::ChangeFeedInclude, changefeed::ChangeFeed, index::Distance, Base,
@ -409,6 +411,19 @@ impl Parser<'_> {
self.parse_distance().map(Some)
}
pub fn parse_vector_type(&mut self) -> ParseResult<VectorType> {
match self.next().kind {
TokenKind::VectorType(x) => Ok(match x {
VectorTypeKind::F64 => VectorType::F64,
VectorTypeKind::F32 => VectorType::F32,
VectorTypeKind::I64 => VectorType::I64,
VectorTypeKind::I32 => VectorType::I32,
VectorTypeKind::I16 => VectorType::I16,
}),
x => unexpected!(self, x, "a vector type"),
}
}
pub fn parse_custom_function_name(&mut self) -> ParseResult<Ident> {
expected!(self, t!("fn"));
expected!(self, t!("::"));

View file

@ -221,6 +221,28 @@ impl DistanceKind {
}
}
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
#[non_exhaustive]
pub enum VectorTypeKind {
F64,
F32,
I64,
I32,
I16,
}
impl VectorTypeKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::F64 => "F64",
Self::F32 => "F32",
Self::I64 => "I64",
Self::I32 => "I32",
Self::I16 => "I16",
}
}
}
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
#[non_exhaustive]
pub enum NumberKind {
@ -252,6 +274,7 @@ pub enum TokenKind {
ChangeFeedInclude(ChangeFeedInclude),
Language(Language),
Distance(DistanceKind),
VectorType(VectorTypeKind),
Operator(Operator),
OpenDelim(Delim),
CloseDelim(Delim),
@ -368,6 +391,7 @@ impl TokenKind {
TokenKind::Algorithm(x) => Self::algorithm_as_str(x),
TokenKind::Language(x) => x.as_str(),
TokenKind::Distance(x) => x.as_str(),
TokenKind::VectorType(x) => x.as_str(),
TokenKind::OpenDelim(Delim::Paren) => "(",
TokenKind::OpenDelim(Delim::Brace) => "{",
TokenKind::OpenDelim(Delim::Bracket) => "[",

View file

@ -19,7 +19,7 @@ fn bench_index_btree(c: &mut Criterion) {
let (samples_len, samples) = setup();
let mut group = c.benchmark_group("index_btree");
group.throughput(Throughput::Elements(1));
group.throughput(Throughput::Elements(samples_len as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(30));
@ -66,8 +66,9 @@ where
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
let mut t = BTree::<BK>::new(BState::new(100));
let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size);
let mut s = TreeStore::new(TreeNodeProvider::Debug, c, Write).await;
let np = TreeNodeProvider::Debug;
let c = TreeCache::new(0, np.get_key(0), np.clone(), cache_size);
let mut s = TreeStore::new(np, c.into(), Write).await;
for i in 0..samples_size {
let (key, payload) = sample_provider(i);
// Insert the sample

View file

@ -3,11 +3,11 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion, Thro
use futures::executor::block_on;
use rand::prelude::ThreadRng;
use rand::{thread_rng, Rng};
use reblessive::TreeStack;
use std::time::Duration;
use surrealdb::idx::docids::DocId;
use surrealdb::idx::trees::mtree::{MState, MTree};
use surrealdb::idx::trees::store::cache::TreeCache;
use surrealdb::idx::trees::store::{TreeNodeProvider, TreeStore};
use surrealdb::idx::trees::store::TreeNodeProvider;
use surrealdb::idx::trees::vector::Vector;
use surrealdb::kvs::Datastore;
use surrealdb::kvs::LockType::Optimistic;
@ -126,14 +126,24 @@ async fn insert_objects(
let mut rng = thread_rng();
let mut t = mtree();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap();
let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size);
let mut s = TreeStore::new(TreeNodeProvider::Debug, c.clone(), Write).await;
for i in 0..samples_size {
let object = random_object(&mut rng, vector_size).into();
// Insert the sample
t.insert(&mut tx, &mut s, object, i as DocId).await.unwrap();
let mut s =
ds.index_store().get_store_mtree(TreeNodeProvider::Debug, 0, Write, cache_size).await;
let mut stack = TreeStack::new();
stack
.enter(|stk| async {
for i in 0..samples_size {
let object = random_object(&mut rng, vector_size).into();
// Insert the sample
t.insert(stk, &mut tx, &mut s, object, i as DocId).await.unwrap();
}
})
.finish()
.await;
if let Some(new_cache) = s.finish(&mut tx).await.unwrap() {
ds.index_store().advance_store_mtree(new_cache);
}
s.finish(&mut tx).await.unwrap();
tx.commit().await.unwrap();
}
@ -147,8 +157,7 @@ async fn knn_lookup_objects(
let mut rng = thread_rng();
let t = mtree();
let mut tx = ds.transaction(Read, Optimistic).await.unwrap();
let c = TreeCache::new(0, TreeNodeProvider::Debug, cache_size);
let s = TreeStore::new(TreeNodeProvider::Debug, c, Read).await;
let s = ds.index_store().get_store_mtree(TreeNodeProvider::Debug, 0, Read, cache_size).await;
for _ in 0..samples_size {
let object = random_object(&mut rng, vector_size).into();
// Insert the sample

View file

@ -125,7 +125,7 @@ async fn remove_statement_index() -> Result<(), Error> {
}
// Every index store cache has been removed
assert!(dbs.index_store().is_empty().await);
assert!(dbs.index_store().is_empty());
Ok(())
}

View file

@ -12,7 +12,7 @@ async fn select_where_mtree_knn() -> Result<(), Error> {
CREATE pts:1 SET point = [1,2,3,4];
CREATE pts:2 SET point = [4,5,6,7];
CREATE pts:3 SET point = [8,9,10,11];
DEFINE INDEX mt_pts ON pts FIELDS point MTREE DIMENSION 4;
DEFINE INDEX mt_pts ON pts FIELDS point MTREE DIMENSION 4 TYPE F32;
LET $pt = [2,3,4,5];
SELECT id, vector::distance::euclidean(point, $pt) AS dist FROM pts WHERE point <|2|> $pt;
SELECT id FROM pts WHERE point <|2|> $pt EXPLAIN;
@ -71,7 +71,7 @@ async fn delete_update_mtree_index() -> Result<(), Error> {
CREATE pts:1 SET point = [1,2,3,4];
CREATE pts:2 SET point = [4,5,6,7];
CREATE pts:3 SET point = [2,3,4,5];
DEFINE INDEX mt_pts ON pts FIELDS point MTREE DIMENSION 4;
DEFINE INDEX mt_pts ON pts FIELDS point MTREE DIMENSION 4 TYPE I32;
CREATE pts:4 SET point = [8,9,10,11];
DELETE pts:2;
UPDATE pts:3 SET point = [12,13,14,15];

View file

@ -1,6 +1,11 @@
# cargo-vet audits file
[[audits.quick_cache]]
who = "Emmanuel Keller <emmanuel.keller@surrealdb.com>"
criteria = "safe-to-deploy"
version = "0.5.1"
[[audits.rustls]]
who = "Tobie Morgan Hitchcock <tobie@surrealdb.com>"
criteria = "safe-to-deploy"

View file

@ -243,10 +243,6 @@ criteria = "safe-to-deploy"
version = "0.3.71"
criteria = "safe-to-deploy"
[[exemptions.base64]]
version = "0.21.7"
criteria = "safe-to-deploy"
[[exemptions.base64]]
version = "0.22.0"
criteria = "safe-to-deploy"
@ -423,10 +419,6 @@ criteria = "safe-to-deploy"
version = "0.18.1"
criteria = "safe-to-deploy"
[[exemptions.core-foundation]]
version = "0.9.4"
criteria = "safe-to-deploy"
[[exemptions.cpp_demangle]]
version = "0.4.3"
criteria = "safe-to-deploy"
@ -595,10 +587,6 @@ criteria = "safe-to-deploy"
version = "0.4.0"
criteria = "safe-to-deploy"
[[exemptions.fastrand]]
version = "2.0.2"
criteria = "safe-to-deploy"
[[exemptions.figment]]
version = "0.10.15"
criteria = "safe-to-deploy"
@ -979,10 +967,6 @@ criteria = "safe-to-deploy"
version = "0.4.11"
criteria = "safe-to-deploy"
[[exemptions.log]]
version = "0.4.21"
criteria = "safe-to-deploy"
[[exemptions.loom]]
version = "0.5.6"
criteria = "safe-to-deploy"
@ -1039,10 +1023,6 @@ criteria = "safe-to-deploy"
version = "0.2.1"
criteria = "safe-to-deploy"
[[exemptions.miniz_oxide]]
version = "0.7.2"
criteria = "safe-to-deploy"
[[exemptions.mio]]
version = "0.8.11"
criteria = "safe-to-deploy"
@ -1091,10 +1071,6 @@ criteria = "safe-to-deploy"
version = "0.4.5"
criteria = "safe-to-deploy"
[[exemptions.num-conv]]
version = "0.1.0"
criteria = "safe-to-deploy"
[[exemptions.num-format]]
version = "0.4.4"
criteria = "safe-to-deploy"
@ -1235,10 +1211,6 @@ criteria = "safe-to-deploy"
version = "1.1.5"
criteria = "safe-to-deploy"
[[exemptions.pin-project-lite]]
version = "0.2.14"
criteria = "safe-to-deploy"
[[exemptions.pkcs1]]
version = "0.7.5"
criteria = "safe-to-deploy"
@ -1623,10 +1595,6 @@ criteria = "safe-to-deploy"
version = "2.10.0"
criteria = "safe-to-deploy"
[[exemptions.semver]]
version = "1.0.22"
criteria = "safe-to-deploy"
[[exemptions.send_wrapper]]
version = "0.6.0"
criteria = "safe-to-deploy"
@ -1843,18 +1811,6 @@ criteria = "safe-to-run"
version = "0.2.15"
criteria = "safe-to-run"
[[exemptions.thiserror]]
version = "1.0.58"
criteria = "safe-to-deploy"
[[exemptions.thiserror-impl]]
version = "1.0.58"
criteria = "safe-to-deploy"
[[exemptions.thread_local]]
version = "1.1.8"
criteria = "safe-to-deploy"
[[exemptions.time]]
version = "0.3.34"
criteria = "safe-to-deploy"
@ -2067,14 +2023,6 @@ criteria = "safe-to-deploy"
version = "0.2.92"
criteria = "safe-to-deploy"
[[exemptions.wasm-bindgen-macro-support]]
version = "0.2.92"
criteria = "safe-to-deploy"
[[exemptions.wasm-bindgen-shared]]
version = "0.2.92"
criteria = "safe-to-deploy"
[[exemptions.wasm-streams]]
version = "0.4.0"
criteria = "safe-to-deploy"

View file

@ -3,11 +3,11 @@
[[unpublished.surrealdb]]
version = "1.5.0"
audited_as = "1.3.1"
audited_as = "1.4.2"
[[unpublished.surrealdb-core]]
version = "2.0.0-1.5.0"
audited_as = "1.3.1"
audited_as = "2.0.0-1.4.2"
[[publisher.addr]]
version = "0.15.6"
@ -37,6 +37,13 @@ user-id = 3788
user-login = "emilio"
user-name = "Emilio Cobos Álvarez"
[[publisher.core-foundation]]
version = "0.9.3"
when = "2022-02-07"
user-id = 5946
user-login = "jrmuizel"
user-name = "Jeff Muizelaar"
[[publisher.core-foundation-sys]]
version = "0.8.4"
when = "2023-04-03"
@ -58,13 +65,6 @@ user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.echodb]]
version = "0.4.0"
when = "2023-03-26"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.echodb]]
version = "0.6.0"
when = "2024-04-05"
@ -100,13 +100,6 @@ user-id = 3987
user-login = "rushmorem"
user-name = "Rushmore Mushambi"
[[publisher.revision]]
version = "0.5.0"
when = "2023-08-29"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.revision]]
version = "0.7.0"
when = "2024-04-17"
@ -114,13 +107,6 @@ user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.revision-derive]]
version = "0.5.0"
when = "2023-08-29"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.revision-derive]]
version = "0.7.0"
when = "2024-04-17"
@ -136,15 +122,15 @@ user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.surrealdb]]
version = "1.3.1"
when = "2024-03-15"
version = "1.4.2"
when = "2024-04-19"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.surrealdb-core]]
version = "1.3.1"
when = "2024-03-15"
version = "2.0.0-1.4.2"
when = "2024-04-19"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
@ -164,15 +150,8 @@ user-login = "rushmorem"
user-name = "Rushmore Mushambi"
[[publisher.surrealkv]]
version = "0.1.3"
when = "2024-02-22"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.surrealkv]]
version = "0.1.4"
when = "2024-04-10"
version = "0.1.5"
when = "2024-04-24"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
@ -213,15 +192,8 @@ user-login = "Manishearth"
user-name = "Manish Goregaokar"
[[publisher.vart]]
version = "0.1.1"
when = "2024-02-10"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
[[publisher.vart]]
version = "0.2.0"
when = "2024-04-04"
version = "0.2.1"
when = "2024-04-24"
user-id = 145457
user-login = "tobiemh"
user-name = "Tobie Morgan Hitchcock"
@ -261,6 +233,12 @@ criteria = "safe-to-deploy"
version = "0.1.6"
notes = "Contains no unsafe code, no IO, no build.rs."
[[audits.bytecode-alliance.audits.base64]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
version = "0.21.0"
notes = "This crate has no dependencies, no build.rs, and contains no unsafe code."
[[audits.bytecode-alliance.audits.block-buffer]]
who = "Benjamin Bouvier <public@benj.me>"
criteria = "safe-to-deploy"
@ -298,6 +276,15 @@ criteria = "safe-to-deploy"
delta = "0.3.0 -> 0.3.1"
notes = "Just a dependency version bump and a bug fix for redox"
[[audits.bytecode-alliance.audits.fastrand]]
who = "Alex Crichton <alex@alexcrichton.com>"
criteria = "safe-to-deploy"
delta = "2.0.0 -> 2.0.1"
notes = """
This update had a few doc updates but no otherwise-substantial source code
updates.
"""
[[audits.bytecode-alliance.audits.fd-lock]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
@ -377,6 +364,20 @@ who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
version = "0.1.0"
[[audits.bytecode-alliance.audits.miniz_oxide]]
who = "Alex Crichton <alex@alexcrichton.com>"
criteria = "safe-to-deploy"
version = "0.7.1"
notes = """
This crate is a Rust implementation of zlib compression/decompression and has
been used by default by the Rust standard library for quite some time. It's also
a default dependency of the popular `backtrace` crate for decompressing debug
information. This crate forbids unsafe code and does not otherwise access system
resources. It's originally a port of the `miniz.c` library as well, and given
its own longevity should be relatively hardened against some of the more common
compression-related issues.
"""
[[audits.bytecode-alliance.audits.native-tls]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
@ -438,6 +439,12 @@ criteria = "safe-to-deploy"
version = "0.1.21"
notes = "I am the author of this crate."
[[audits.bytecode-alliance.audits.semver]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
version = "1.0.17"
notes = "plenty of unsafe pointer and vec tricks, but in well-structured and commented code that appears to be correct"
[[audits.bytecode-alliance.audits.sharded-slab]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
@ -449,6 +456,12 @@ who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
version = "1.4.1"
[[audits.bytecode-alliance.audits.thread_local]]
who = "Pat Hickey <phickey@fastly.com>"
criteria = "safe-to-deploy"
version = "1.1.4"
notes = "uses unsafe to implement thread local storage of objects"
[[audits.bytecode-alliance.audits.tinyvec]]
who = "Alex Crichton <alex@alexcrichton.com>"
criteria = "safe-to-deploy"
@ -565,6 +578,18 @@ criteria = "safe-to-deploy"
version = "1.0.1"
notes = "No unsafe usage or ambient capabilities"
[[audits.embark-studios.audits.thiserror]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
version = "1.0.40"
notes = "Wrapper over implementation crate, found no unsafe or ambient capabilities used"
[[audits.embark-studios.audits.thiserror-impl]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
version = "1.0.40"
notes = "Found no unsafe or ambient capabilities used"
[[audits.embark-studios.audits.utf8parse]]
who = "Johan Andersson <opensource@embark-studios.com>"
criteria = "safe-to-deploy"
@ -772,6 +797,20 @@ criteria = "safe-to-deploy"
delta = "0.1.0 -> 0.1.1"
aggregated-from = "https://chromium.googlesource.com/chromiumos/third_party/rust_crates/+/refs/heads/main/cargo-vet/audits.toml?format=TEXT"
[[audits.google.audits.pin-project-lite]]
who = "David Koloski <dkoloski@google.com>"
criteria = "safe-to-deploy"
version = "0.2.9"
notes = "Reviewed on https://fxrev.dev/824504"
aggregated-from = "https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/third_party/rust_crates/supply-chain/audits.toml?format=TEXT"
[[audits.google.audits.pin-project-lite]]
who = "David Koloski <dkoloski@google.com>"
criteria = "safe-to-deploy"
delta = "0.2.9 -> 0.2.13"
notes = "Audited at https://fxrev.dev/946396"
aggregated-from = "https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/third_party/rust_crates/supply-chain/audits.toml?format=TEXT"
[[audits.google.audits.predicates-core]]
who = "Max Lee <endlesspring@google.com>"
criteria = "safe-to-run"
@ -913,6 +952,21 @@ criteria = "safe-to-deploy"
version = "0.9.4"
aggregated-from = "https://chromium.googlesource.com/chromiumos/third_party/rust_crates/+/refs/heads/main/cargo-vet/audits.toml?format=TEXT"
[[audits.isrg.audits.base64]]
who = "Tim Geoghegan <timg@letsencrypt.org>"
criteria = "safe-to-deploy"
delta = "0.21.0 -> 0.21.1"
[[audits.isrg.audits.base64]]
who = "Brandon Pitman <bran@bran.land>"
criteria = "safe-to-deploy"
delta = "0.21.1 -> 0.21.2"
[[audits.isrg.audits.base64]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
delta = "0.21.2 -> 0.21.3"
[[audits.isrg.audits.block-buffer]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
@ -998,11 +1052,26 @@ who = "Ameer Ghani <inahga@divviup.org>"
criteria = "safe-to-deploy"
version = "1.12.1"
[[audits.isrg.audits.thiserror]]
who = "Brandon Pitman <bran@bran.land>"
criteria = "safe-to-deploy"
delta = "1.0.40 -> 1.0.43"
[[audits.isrg.audits.thiserror-impl]]
who = "Brandon Pitman <bran@bran.land>"
criteria = "safe-to-deploy"
delta = "1.0.40 -> 1.0.43"
[[audits.isrg.audits.untrusted]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
version = "0.7.1"
[[audits.isrg.audits.wasm-bindgen-shared]]
who = "David Cook <dcook@divviup.org>"
criteria = "safe-to-deploy"
version = "0.2.83"
[[audits.mozilla.wildcard-audits.cexpr]]
who = "Emilio Cobos Álvarez <emilio@crisal.io>"
criteria = "safe-to-deploy"
@ -1012,6 +1081,16 @@ end = "2024-04-21"
notes = "No unsafe code, rather straight-forward parser."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.wildcard-audits.core-foundation]]
who = "Bobby Holley <bobbyholley@gmail.com>"
criteria = "safe-to-deploy"
user-id = 5946 # Jeff Muizelaar (jrmuizel)
start = "2019-03-29"
end = "2023-05-04"
renew = false
notes = "I've reviewed every source contribution that was neither authored nor reviewed by Mozilla."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.wildcard-audits.core-foundation-sys]]
who = "Bobby Holley <bobbyholley@gmail.com>"
criteria = "safe-to-deploy"
@ -1128,6 +1207,13 @@ criteria = "safe-to-deploy"
delta = "0.10.2 -> 0.10.3"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.core-foundation]]
who = "Teodor Tanasoaia <ttanasoaia@mozilla.com>"
criteria = "safe-to-deploy"
delta = "0.9.3 -> 0.9.4"
notes = "I've reviewed every source contribution that was neither authored nor reviewed by Mozilla."
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.crossbeam-channel]]
who = "Jan-Erik Rediger <jrediger@mozilla.com>"
criteria = "safe-to-deploy"
@ -1164,6 +1250,12 @@ criteria = "safe-to-deploy"
delta = "0.3.1 -> 0.3.3"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.fastrand]]
who = "Mike Hommey <mh+mozilla@glandium.org>"
criteria = "safe-to-deploy"
delta = "1.9.0 -> 2.0.0"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.fd-lock]]
who = "Jan-Erik Rediger <jrediger@mozilla.com>"
criteria = "safe-to-deploy"
@ -1241,6 +1333,26 @@ version = "1.4.0"
notes = "I have read over the macros, and audited the unsafe code."
aggregated-from = "https://raw.githubusercontent.com/mozilla/cargo-vet/main/supply-chain/audits.toml"
[[audits.mozilla.audits.log]]
who = "Mike Hommey <mh+mozilla@glandium.org>"
criteria = "safe-to-deploy"
version = "0.4.17"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.mozilla.audits.log]]
who = "Jan-Erik Rediger <jrediger@mozilla.com>"
criteria = "safe-to-deploy"
delta = "0.4.17 -> 0.4.18"
notes = "One dependency removed, others updated (which we don't rely on), some APIs (which we don't use) changed."
aggregated-from = "https://raw.githubusercontent.com/mozilla/glean/main/supply-chain/audits.toml"
[[audits.mozilla.audits.log]]
who = "Kagami Sascha Rosylight <krosylight@mozilla.com>"
criteria = "safe-to-deploy"
delta = "0.4.18 -> 0.4.20"
notes = "Only cfg attribute and internal macro changes and module refactorings"
aggregated-from = "https://raw.githubusercontent.com/mozilla/glean/main/supply-chain/audits.toml"
[[audits.mozilla.audits.num-bigint]]
who = "Josh Stone <jistone@redhat.com>"
criteria = "safe-to-deploy"
@ -1451,6 +1563,24 @@ criteria = "safe-to-deploy"
delta = "2.4.1 -> 2.5.0"
aggregated-from = "https://hg.mozilla.org/mozilla-central/raw-file/tip/supply-chain/audits.toml"
[[audits.zcash.audits.base64]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "0.21.3 -> 0.21.4"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.base64]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "0.21.4 -> 0.21.5"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.base64]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "0.21.5 -> 0.21.7"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.block-buffer]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
@ -1464,6 +1594,12 @@ criteria = "safe-to-deploy"
delta = "0.3.3 -> 0.3.8"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.fastrand]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "2.0.1 -> 2.0.2"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.futures-channel]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
@ -1497,6 +1633,30 @@ version = "0.1.3"
notes = "Reviewed in full."
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.log]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "0.4.20 -> 0.4.21"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.miniz_oxide]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "0.7.1 -> 0.7.2"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.num-conv]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
version = "0.1.0"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.pin-project-lite]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "0.2.13 -> 0.2.14"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.quote]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
@ -1536,6 +1696,30 @@ be set correctly by `cargo`.
"""
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.semver]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.17 -> 1.0.18"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.semver]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.18 -> 1.0.19"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.semver]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.19 -> 1.0.20"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.semver]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "1.0.20 -> 1.0.22"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.sharded-slab]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
@ -1559,6 +1743,93 @@ criteria = "safe-to-deploy"
delta = "2.1.0 -> 2.2.0"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.43 -> 1.0.48"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.48 -> 1.0.51"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.51 -> 1.0.52"
notes = "Reruns the build script if the `RUSTC_BOOTSTRAP` env variable changes."
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.52 -> 1.0.56"
notes = """
Build script changes are to refactor the existing probe into a separate file
(which removes a filesystem write), and adjust how it gets rerun in response to
changes in the build environment.
"""
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "1.0.56 -> 1.0.58"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror-impl]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.43 -> 1.0.48"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror-impl]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.48 -> 1.0.51"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror-impl]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.51 -> 1.0.52"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror-impl]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "1.0.52 -> 1.0.56"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thiserror-impl]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "1.0.56 -> 1.0.58"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thread_local]]
who = "Jack Grigg <jack@z.cash>"
criteria = "safe-to-deploy"
delta = "1.1.4 -> 1.1.7"
notes = """
New `unsafe` usage:
- An extra `deallocate_bucket`, to replace a `Mutex::lock` with a `compare_exchange`.
- Setting and getting a `#[thread_local] static mut Option<Thread>` on nightly.
"""
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.thread_local]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "1.1.7 -> 1.1.8"
notes = """
Adds `unsafe` code that makes an assumption that `ptr::null_mut::<Entry<T>>()` is a valid representation
of an `AtomicPtr<Entry<T>>`, but this is likely a correct assumption.
"""
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.time-core]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
@ -1600,3 +1871,34 @@ Migrates to `try-lock 0.2.4` to replace some unsafe APIs that were not marked
`unsafe` (but that were being used safely).
"""
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.wasm-bindgen-macro-support]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
version = "0.2.92"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.wasm-bindgen-shared]]
who = "Jack Grigg <jack@z.cash>"
criteria = "safe-to-deploy"
delta = "0.2.83 -> 0.2.84"
notes = "Bumps the schema version to add `linked_modules`."
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.wasm-bindgen-shared]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "0.2.84 -> 0.2.87"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.wasm-bindgen-shared]]
who = "Jack Grigg <jack@electriccoin.co>"
criteria = "safe-to-deploy"
delta = "0.2.87 -> 0.2.89"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"
[[audits.zcash.audits.wasm-bindgen-shared]]
who = "Daira-Emma Hopwood <daira@jacaranda.org>"
criteria = "safe-to-deploy"
delta = "0.2.89 -> 0.2.92"
aggregated-from = "https://raw.githubusercontent.com/zcash/zcash/master/qa/supply-chain/audits.toml"