feat: Change Feed option for DEFINE [TABLE|DATABASE] (#2189)

This commit is contained in:
Yusuke Kuoka 2023-06-28 16:19:40 +09:00 committed by GitHub
parent e30f70b907
commit 781b1f944e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 2 deletions

View file

@ -1429,6 +1429,7 @@ impl Transaction {
let key = crate::key::db::new(ns, db); let key = crate::key::db::new(ns, db);
let val = DefineDatabaseStatement { let val = DefineDatabaseStatement {
name: db.to_owned().into(), name: db.to_owned().into(),
changefeed: None,
}; };
self.put(key, &val).await?; self.put(key, &val).await?;
Ok(val) Ok(val)
@ -1614,6 +1615,7 @@ impl Transaction {
let key = crate::key::db::new(ns, db); let key = crate::key::db::new(ns, db);
let val = DefineDatabaseStatement { let val = DefineDatabaseStatement {
name: db.to_owned().into(), name: db.to_owned().into(),
changefeed: None,
}; };
self.put(key, &val).await?; self.put(key, &val).await?;
Ok(Arc::new(val)) Ok(Arc::new(val))

67
lib/src/sql/changefeed.rs Normal file
View file

@ -0,0 +1,67 @@
use crate::sql::comment::shouldbespace;
use crate::sql::duration::{duration, Duration};
use crate::sql::error::IResult;
use nom::bytes::complete::tag_no_case;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display, Formatter};
use std::str;
use std::time;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct ChangeFeed {
pub expiry: time::Duration,
}
impl Display for ChangeFeed {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "CHANGEFEED {}", Duration(self.expiry))?;
Ok(())
}
}
pub fn changefeed(i: &str) -> IResult<&str, ChangeFeed> {
let (i, _) = tag_no_case("CHANGEFEED")(i)?;
let (i, _) = shouldbespace(i)?;
let (i, v) = duration(i)?;
Ok((
i,
ChangeFeed {
expiry: v.0,
},
))
}
impl Default for ChangeFeed {
fn default() -> Self {
Self {
expiry: time::Duration::from_secs(0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn changefeed_missing() {
let sql: &str = "";
let res = changefeed(sql);
assert!(res.is_err());
}
#[test]
fn changefeed_enabled() {
let sql = "CHANGEFEED 1h";
let res = changefeed(sql);
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!("CHANGEFEED 1h", format!("{}", out));
assert_eq!(
out,
ChangeFeed {
expiry: time::Duration::from_secs(3600)
}
);
}
}

View file

@ -6,6 +6,7 @@ pub(crate) mod base;
pub(crate) mod block; pub(crate) mod block;
pub(crate) mod bytes; pub(crate) mod bytes;
pub(crate) mod cast; pub(crate) mod cast;
pub(crate) mod changefeed;
pub(crate) mod comment; pub(crate) mod comment;
pub(crate) mod common; pub(crate) mod common;
pub(crate) mod cond; pub(crate) mod cond;

View file

@ -5,6 +5,7 @@ use crate::err::Error;
use crate::sql::algorithm::{algorithm, Algorithm}; use crate::sql::algorithm::{algorithm, Algorithm};
use crate::sql::base::{base, base_or_scope, Base}; use crate::sql::base::{base, base_or_scope, Base};
use crate::sql::block::{block, Block}; use crate::sql::block::{block, Block};
use crate::sql::changefeed::{changefeed, ChangeFeed};
use crate::sql::comment::{mightbespace, shouldbespace}; use crate::sql::comment::{mightbespace, shouldbespace};
use crate::sql::common::commas; use crate::sql::common::commas;
use crate::sql::duration::{duration, Duration}; use crate::sql::duration::{duration, Duration};
@ -172,6 +173,7 @@ fn namespace(i: &str) -> IResult<&str, DefineNamespaceStatement> {
#[format(Named)] #[format(Named)]
pub struct DefineDatabaseStatement { pub struct DefineDatabaseStatement {
pub name: Ident, pub name: Ident,
pub changefeed: Option<ChangeFeed>,
} }
impl DefineDatabaseStatement { impl DefineDatabaseStatement {
@ -196,7 +198,11 @@ impl DefineDatabaseStatement {
impl Display for DefineDatabaseStatement { impl Display for DefineDatabaseStatement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DEFINE DATABASE {}", self.name) write!(f, "DEFINE DATABASE {}", self.name)?;
if let Some(ref cf) = self.changefeed {
write!(f, " CHANGEFEED {}", crate::sql::duration::Duration(cf.expiry))?;
}
Ok(())
} }
} }
@ -206,14 +212,36 @@ fn database(i: &str) -> IResult<&str, DefineDatabaseStatement> {
let (i, _) = alt((tag_no_case("DB"), tag_no_case("DATABASE")))(i)?; let (i, _) = alt((tag_no_case("DB"), tag_no_case("DATABASE")))(i)?;
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, name) = ident(i)?; let (i, name) = ident(i)?;
let (i, opts) = many0(database_opts)(i)?;
Ok(( Ok((
i, i,
DefineDatabaseStatement { DefineDatabaseStatement {
name, name,
changefeed: opts
.iter()
.map(|x| match x {
DefineDatabaseOption::ChangeFeed(ref v) => v.to_owned(),
})
.next(),
}, },
)) ))
} }
fn database_changefeed(i: &str) -> IResult<&str, DefineDatabaseOption> {
let (i, _) = shouldbespace(i)?;
let (i, v) = changefeed(i)?;
Ok((i, DefineDatabaseOption::ChangeFeed(v)))
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum DefineDatabaseOption {
ChangeFeed(ChangeFeed),
}
fn database_opts(i: &str) -> IResult<&str, DefineDatabaseOption> {
database_changefeed(i)
}
// -------------------------------------------------- // --------------------------------------------------
// -------------------------------------------------- // --------------------------------------------------
// -------------------------------------------------- // --------------------------------------------------
@ -784,6 +812,7 @@ pub struct DefineTableStatement {
pub full: bool, pub full: bool,
pub view: Option<View>, pub view: Option<View>,
pub permissions: Permissions, pub permissions: Permissions,
pub changefeed: Option<ChangeFeed>,
} }
impl DefineTableStatement { impl DefineTableStatement {
@ -863,6 +892,9 @@ impl Display for DefineTableStatement {
}; };
write!(f, "{}", self.permissions)?; write!(f, "{}", self.permissions)?;
} }
if let Some(ref cf) = self.changefeed {
write!(f, " CHANGEFEED {}", crate::sql::duration::Duration(cf.expiry))?;
}
Ok(()) Ok(())
} }
} }
@ -904,6 +936,10 @@ fn table(i: &str) -> IResult<&str, DefineTableStatement> {
_ => None, _ => None,
}) })
.unwrap_or_default(), .unwrap_or_default(),
changefeed: opts.iter().find_map(|x| match x {
DefineTableOption::ChangeFeed(ref v) => Some(v.to_owned()),
_ => None,
}),
}, },
)) ))
} }
@ -915,10 +951,18 @@ pub enum DefineTableOption {
Schemaless, Schemaless,
Schemafull, Schemafull,
Permissions(Permissions), Permissions(Permissions),
ChangeFeed(ChangeFeed),
} }
fn table_opts(i: &str) -> IResult<&str, DefineTableOption> { fn table_opts(i: &str) -> IResult<&str, DefineTableOption> {
alt((table_drop, table_view, table_schemaless, table_schemafull, table_permissions))(i) alt((
table_drop,
table_view,
table_schemaless,
table_schemafull,
table_permissions,
table_changefeed,
))(i)
} }
fn table_drop(i: &str) -> IResult<&str, DefineTableOption> { fn table_drop(i: &str) -> IResult<&str, DefineTableOption> {
@ -927,6 +971,12 @@ fn table_drop(i: &str) -> IResult<&str, DefineTableOption> {
Ok((i, DefineTableOption::Drop)) Ok((i, DefineTableOption::Drop))
} }
fn table_changefeed(i: &str) -> IResult<&str, DefineTableOption> {
let (i, _) = shouldbespace(i)?;
let (i, v) = changefeed(i)?;
Ok((i, DefineTableOption::ChangeFeed(v)))
}
fn table_view(i: &str) -> IResult<&str, DefineTableOption> { fn table_view(i: &str) -> IResult<&str, DefineTableOption> {
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, v) = view(i)?; let (i, v) = view(i)?;
@ -1379,4 +1429,30 @@ mod tests {
"DEFINE INDEX my_index ON my_table FIELDS my_col SEARCH ANALYZER my_analyzer VS ORDER 100" "DEFINE INDEX my_index ON my_table FIELDS my_col SEARCH ANALYZER my_analyzer VS ORDER 100"
); );
} }
#[test]
fn define_database_with_changefeed() {
let sql = "DEFINE DATABASE mydatabase CHANGEFEED 1h";
let res = database(sql);
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!(sql, format!("{}", out));
let serialized = out.to_vec();
let deserializled = DefineDatabaseStatement::try_from(&serialized).unwrap();
assert_eq!(out, deserializled);
}
#[test]
fn define_table_with_changefeed() {
let sql = "DEFINE TABLE mytable SCHEMALESS CHANGEFEED 1h";
let res = table(sql);
assert!(res.is_ok());
let out = res.unwrap().1;
assert_eq!(sql, format!("{}", out));
let serialized = out.to_vec();
let deserializled = DefineTableStatement::try_from(&serialized).unwrap();
assert_eq!(out, deserializled);
}
} }