From 781b1f944e775f455a0ff9d7ba9d792f582faade Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Wed, 28 Jun 2023 16:19:40 +0900 Subject: [PATCH] feat: Change Feed option for DEFINE [TABLE|DATABASE] (#2189) --- lib/src/kvs/tx.rs | 2 + lib/src/sql/changefeed.rs | 67 ++++++++++++++++++++++++++ lib/src/sql/mod.rs | 1 + lib/src/sql/statements/define.rs | 80 +++++++++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 lib/src/sql/changefeed.rs diff --git a/lib/src/kvs/tx.rs b/lib/src/kvs/tx.rs index 396e8c32..15268cdb 100644 --- a/lib/src/kvs/tx.rs +++ b/lib/src/kvs/tx.rs @@ -1429,6 +1429,7 @@ impl Transaction { let key = crate::key::db::new(ns, db); let val = DefineDatabaseStatement { name: db.to_owned().into(), + changefeed: None, }; self.put(key, &val).await?; Ok(val) @@ -1614,6 +1615,7 @@ impl Transaction { let key = crate::key::db::new(ns, db); let val = DefineDatabaseStatement { name: db.to_owned().into(), + changefeed: None, }; self.put(key, &val).await?; Ok(Arc::new(val)) diff --git a/lib/src/sql/changefeed.rs b/lib/src/sql/changefeed.rs new file mode 100644 index 00000000..bd476ef3 --- /dev/null +++ b/lib/src/sql/changefeed.rs @@ -0,0 +1,67 @@ +use crate::sql::comment::shouldbespace; +use crate::sql::duration::{duration, Duration}; +use crate::sql::error::IResult; +use nom::bytes::complete::tag_no_case; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Display, Formatter}; +use std::str; +use std::time; + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub struct ChangeFeed { + pub expiry: time::Duration, +} + +impl Display for ChangeFeed { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "CHANGEFEED {}", Duration(self.expiry))?; + Ok(()) + } +} + +pub fn changefeed(i: &str) -> IResult<&str, ChangeFeed> { + let (i, _) = tag_no_case("CHANGEFEED")(i)?; + let (i, _) = shouldbespace(i)?; + let (i, v) = duration(i)?; + Ok(( + i, + ChangeFeed { + expiry: v.0, + }, + )) +} + +impl Default for ChangeFeed { + fn default() -> Self { + Self { + expiry: time::Duration::from_secs(0), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn changefeed_missing() { + let sql: &str = ""; + let res = changefeed(sql); + assert!(res.is_err()); + } + + #[test] + fn changefeed_enabled() { + let sql = "CHANGEFEED 1h"; + let res = changefeed(sql); + assert!(res.is_ok()); + let out = res.unwrap().1; + assert_eq!("CHANGEFEED 1h", format!("{}", out)); + assert_eq!( + out, + ChangeFeed { + expiry: time::Duration::from_secs(3600) + } + ); + } +} diff --git a/lib/src/sql/mod.rs b/lib/src/sql/mod.rs index 20b56485..2b8e3a71 100644 --- a/lib/src/sql/mod.rs +++ b/lib/src/sql/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod base; pub(crate) mod block; pub(crate) mod bytes; pub(crate) mod cast; +pub(crate) mod changefeed; pub(crate) mod comment; pub(crate) mod common; pub(crate) mod cond; diff --git a/lib/src/sql/statements/define.rs b/lib/src/sql/statements/define.rs index f4627ed5..d5d2bc9c 100644 --- a/lib/src/sql/statements/define.rs +++ b/lib/src/sql/statements/define.rs @@ -5,6 +5,7 @@ use crate::err::Error; use crate::sql::algorithm::{algorithm, Algorithm}; use crate::sql::base::{base, base_or_scope, Base}; use crate::sql::block::{block, Block}; +use crate::sql::changefeed::{changefeed, ChangeFeed}; use crate::sql::comment::{mightbespace, shouldbespace}; use crate::sql::common::commas; use crate::sql::duration::{duration, Duration}; @@ -172,6 +173,7 @@ fn namespace(i: &str) -> IResult<&str, DefineNamespaceStatement> { #[format(Named)] pub struct DefineDatabaseStatement { pub name: Ident, + pub changefeed: Option, } impl DefineDatabaseStatement { @@ -196,7 +198,11 @@ impl DefineDatabaseStatement { impl Display for DefineDatabaseStatement { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "DEFINE DATABASE {}", self.name) + write!(f, "DEFINE DATABASE {}", self.name)?; + if let Some(ref cf) = self.changefeed { + write!(f, " CHANGEFEED {}", crate::sql::duration::Duration(cf.expiry))?; + } + Ok(()) } } @@ -206,14 +212,36 @@ fn database(i: &str) -> IResult<&str, DefineDatabaseStatement> { let (i, _) = alt((tag_no_case("DB"), tag_no_case("DATABASE")))(i)?; let (i, _) = shouldbespace(i)?; let (i, name) = ident(i)?; + let (i, opts) = many0(database_opts)(i)?; Ok(( i, DefineDatabaseStatement { name, + changefeed: opts + .iter() + .map(|x| match x { + DefineDatabaseOption::ChangeFeed(ref v) => v.to_owned(), + }) + .next(), }, )) } +fn database_changefeed(i: &str) -> IResult<&str, DefineDatabaseOption> { + let (i, _) = shouldbespace(i)?; + let (i, v) = changefeed(i)?; + Ok((i, DefineDatabaseOption::ChangeFeed(v))) +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum DefineDatabaseOption { + ChangeFeed(ChangeFeed), +} + +fn database_opts(i: &str) -> IResult<&str, DefineDatabaseOption> { + database_changefeed(i) +} + // -------------------------------------------------- // -------------------------------------------------- // -------------------------------------------------- @@ -784,6 +812,7 @@ pub struct DefineTableStatement { pub full: bool, pub view: Option, pub permissions: Permissions, + pub changefeed: Option, } impl DefineTableStatement { @@ -863,6 +892,9 @@ impl Display for DefineTableStatement { }; write!(f, "{}", self.permissions)?; } + if let Some(ref cf) = self.changefeed { + write!(f, " CHANGEFEED {}", crate::sql::duration::Duration(cf.expiry))?; + } Ok(()) } } @@ -904,6 +936,10 @@ fn table(i: &str) -> IResult<&str, DefineTableStatement> { _ => None, }) .unwrap_or_default(), + changefeed: opts.iter().find_map(|x| match x { + DefineTableOption::ChangeFeed(ref v) => Some(v.to_owned()), + _ => None, + }), }, )) } @@ -915,10 +951,18 @@ pub enum DefineTableOption { Schemaless, Schemafull, Permissions(Permissions), + ChangeFeed(ChangeFeed), } fn table_opts(i: &str) -> IResult<&str, DefineTableOption> { - alt((table_drop, table_view, table_schemaless, table_schemafull, table_permissions))(i) + alt(( + table_drop, + table_view, + table_schemaless, + table_schemafull, + table_permissions, + table_changefeed, + ))(i) } fn table_drop(i: &str) -> IResult<&str, DefineTableOption> { @@ -927,6 +971,12 @@ fn table_drop(i: &str) -> IResult<&str, DefineTableOption> { Ok((i, DefineTableOption::Drop)) } +fn table_changefeed(i: &str) -> IResult<&str, DefineTableOption> { + let (i, _) = shouldbespace(i)?; + let (i, v) = changefeed(i)?; + Ok((i, DefineTableOption::ChangeFeed(v))) +} + fn table_view(i: &str) -> IResult<&str, DefineTableOption> { let (i, _) = shouldbespace(i)?; let (i, v) = view(i)?; @@ -1379,4 +1429,30 @@ mod tests { "DEFINE INDEX my_index ON my_table FIELDS my_col SEARCH ANALYZER my_analyzer VS ORDER 100" ); } + + #[test] + fn define_database_with_changefeed() { + let sql = "DEFINE DATABASE mydatabase CHANGEFEED 1h"; + let res = database(sql); + assert!(res.is_ok()); + let out = res.unwrap().1; + assert_eq!(sql, format!("{}", out)); + + let serialized = out.to_vec(); + let deserializled = DefineDatabaseStatement::try_from(&serialized).unwrap(); + assert_eq!(out, deserializled); + } + + #[test] + fn define_table_with_changefeed() { + let sql = "DEFINE TABLE mytable SCHEMALESS CHANGEFEED 1h"; + let res = table(sql); + assert!(res.is_ok()); + let out = res.unwrap().1; + assert_eq!(sql, format!("{}", out)); + + let serialized = out.to_vec(); + let deserializled = DefineTableStatement::try_from(&serialized).unwrap(); + assert_eq!(out, deserializled); + } }