From 5446666e7d4d30585d153cb8bb2fc0daa7379adc Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Tue, 12 Sep 2023 23:33:24 +0900 Subject: [PATCH] Fix panic on commit when defining db and tb in strict mode (#2684) Co-authored-by: Yusuke Kuoka Co-authored-by: Mees Delzenne --- lib/src/kvs/ds.rs | 2 -- lib/src/kvs/tx.rs | 52 ++++++++++++++++----------------------------- lib/tests/strict.rs | 16 ++++++++++++++ 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/lib/src/kvs/ds.rs b/lib/src/kvs/ds.rs index ef9aca4b..5f1b9870 100644 --- a/lib/src/kvs/ds.rs +++ b/lib/src/kvs/ds.rs @@ -25,7 +25,6 @@ use channel::Receiver; use channel::Sender; use futures::lock::Mutex; use futures::Future; -use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -728,7 +727,6 @@ impl Datastore { inner, cache: super::cache::Cache::default(), cf: cf::Writer::new(), - write_buffer: HashMap::new(), vso: self.versionstamp_oracle.clone(), }) } diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 87927c84..f37c1334 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -38,7 +38,6 @@ use sql::statements::DefineTokenStatement; use sql::statements::DefineUserStatement; use sql::statements::LiveStatement; use std::borrow::Cow; -use std::collections::HashMap; use std::fmt; use std::fmt::Debug; use std::ops::Range; @@ -55,7 +54,6 @@ pub struct Transaction { pub(super) inner: Inner, pub(super) cache: Cache, pub(super) cf: cf::Writer, - pub(super) write_buffer: HashMap, pub(super) vso: Arc>, } @@ -2485,9 +2483,9 @@ impl Transaction { let id = seq.get_next_id(); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(id) } @@ -2500,9 +2498,9 @@ impl Transaction { seq.remove_id(db); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(()) } @@ -2514,9 +2512,9 @@ impl Transaction { let id = seq.get_next_id(); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(id) } @@ -2529,9 +2527,9 @@ impl Transaction { seq.remove_id(tb); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(()) } @@ -2556,9 +2554,9 @@ impl Transaction { let id = seq.get_next_id(); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(id) } @@ -2571,9 +2569,9 @@ impl Transaction { seq.remove_id(ns); - self.cache.set(key.clone(), Entry::Seq(seq)); - - self.write_buffer.insert(key.clone(), ()); + self.cache.set(key.clone(), Entry::Seq(seq.clone())); + let (k, v) = seq.finish().unwrap(); + self.set(k, v).await?; Ok(()) } @@ -2595,20 +2593,6 @@ impl Transaction { // Lastly, you should set lock=true if you want the changefeed to be correctly ordered for // non-FDB backends. pub(crate) async fn complete_changes(&mut self, _lock: bool) -> Result<(), Error> { - let mut buf = self.write_buffer.clone(); - let writes = buf.drain(); - for (k, _) in writes { - let v = self.cache.get(&k).unwrap(); - let mut seq = if let Entry::Seq(v) = v { - v - } else { - unreachable!(); - }; - if let Some((k, v)) = seq.finish() { - self.set(k, v).await? - } - } - let changes = self.cf.get(); for (tskey, prefix, suffix, v) in changes { self.set_versionstamped_key(tskey, prefix, suffix, v).await? diff --git a/lib/tests/strict.rs b/lib/tests/strict.rs index 0035943e..9e74edb3 100644 --- a/lib/tests/strict.rs +++ b/lib/tests/strict.rs @@ -277,3 +277,19 @@ async fn loose_mode_all_ok() -> Result<(), Error> { // Ok(()) } + +#[tokio::test] +async fn strict_define_in_transaction() -> Result<(), Error> { + let sql = r" + DEFINE NS test; DEFINE DB test; + USE NS test DB test; + BEGIN; + DEFINE TABLE test; + DEFINE FIELD test ON test; -- Panic used to be caused when you add this query within the transaction + COMMIT; + "; + let dbs = new_ds().await?.with_strict_mode(true); + let ses = Session::owner().with_ns("test").with_db("test"); + dbs.execute(sql, &ses, None).await?; + Ok(()) +}