From 99600d50ba8be2a6fc4dc95ad3a9c547f9d23046 Mon Sep 17 00:00:00 2001 From: Przemyslaw Hugh Kaznowski Date: Wed, 28 Feb 2024 19:35:39 +0000 Subject: [PATCH] Add syntax for change feeds to store original value alongside change (#3567) Co-authored-by: Mees Delzenne --- core/src/cf/writer.rs | 2 ++ core/src/sql/v1/arbitrary.rs | 1 + core/src/sql/v1/change_feed_include.rs | 25 +++++++++++++++ core/src/sql/v1/changefeed.rs | 8 ++++- core/src/sql/v1/mod.rs | 1 + .../sql/v1/value/serde/ser/changefeed/mod.rs | 5 +++ core/src/sql/v2/arbitrary.rs | 1 + core/src/sql/v2/change_feed_include.rs | 25 +++++++++++++++ core/src/sql/v2/changefeed.rs | 8 ++++- core/src/sql/v2/mod.rs | 1 + core/src/sql/v2/statements/define/table.rs | 10 +++--- .../sql/v2/value/serde/ser/changefeed/mod.rs | 5 +++ core/src/syn/v1/part/mod.rs | 31 +++++++++++++++++-- core/src/syn/v2/lexer/keywords.rs | 5 +++ core/src/syn/v2/parser/stmt/parts.rs | 9 ++++++ core/src/syn/v2/parser/test/stmt.rs | 12 ++++--- core/src/syn/v2/parser/test/streaming.rs | 2 ++ core/src/syn/v2/token/keyword.rs | 1 + core/src/syn/v2/token/mod.rs | 3 ++ 19 files changed, 142 insertions(+), 13 deletions(-) create mode 100644 core/src/sql/v1/change_feed_include.rs create mode 100644 core/src/sql/v2/change_feed_include.rs diff --git a/core/src/cf/writer.rs b/core/src/cf/writer.rs index 9a2acf36..7756d2a4 100644 --- a/core/src/cf/writer.rs +++ b/core/src/cf/writer.rs @@ -150,6 +150,7 @@ mod tests { name: crate::sql::Ident(db.to_string()), changefeed: Some(ChangeFeed { expiry: Duration::from_secs(10), + store_original: false, }), ..Default::default() }; @@ -157,6 +158,7 @@ mod tests { name: tb.into(), changefeed: Some(ChangeFeed { expiry: Duration::from_secs(10), + store_original: false, }), ..Default::default() }; diff --git a/core/src/sql/v1/arbitrary.rs b/core/src/sql/v1/arbitrary.rs index bdad2bbc..7f027f18 100644 --- a/core/src/sql/v1/arbitrary.rs +++ b/core/src/sql/v1/arbitrary.rs @@ -32,6 +32,7 @@ impl<'a> Arbitrary<'a> for ChangeFeed { fn arbitrary(u: &mut Unstructured<'a>) -> Result { Ok(Self { expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?), + store_original: bool::arbitrary(u)?, }) } } diff --git a/core/src/sql/v1/change_feed_include.rs b/core/src/sql/v1/change_feed_include.rs new file mode 100644 index 00000000..79b100de --- /dev/null +++ b/core/src/sql/v1/change_feed_include.rs @@ -0,0 +1,25 @@ +use revision::revisioned; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] +#[revisioned(revision = 1)] +/// ChangeFeedInclude statements are an appendix +pub enum ChangeFeedInclude { + Original, +} + +impl Default for crate::sql::change_feed_include::ChangeFeedInclude { + fn default() -> Self { + Self::Original + } +} + +impl fmt::Display for crate::sql::change_feed_include::ChangeFeedInclude { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(match self { + Self::Original => "Original", + }) + } +} diff --git a/core/src/sql/v1/changefeed.rs b/core/src/sql/v1/changefeed.rs index e641d0d3..a34b0f65 100644 --- a/core/src/sql/v1/changefeed.rs +++ b/core/src/sql/v1/changefeed.rs @@ -6,14 +6,19 @@ use std::str; use std::time; #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] -#[revisioned(revision = 1)] +#[revisioned(revision = 2)] pub struct ChangeFeed { pub expiry: time::Duration, + #[revision(start = 2)] + pub store_original: bool, } impl Display for ChangeFeed { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "CHANGEFEED {}", Duration(self.expiry))?; + if self.store_original { + write!(f, " INCLUDE ORIGINAL")?; + }; Ok(()) } } @@ -22,6 +27,7 @@ impl Default for ChangeFeed { fn default() -> Self { Self { expiry: time::Duration::from_secs(0), + store_original: false, } } } diff --git a/core/src/sql/v1/mod.rs b/core/src/sql/v1/mod.rs index f68e6711..3bf987f2 100644 --- a/core/src/sql/v1/mod.rs +++ b/core/src/sql/v1/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod base; pub(crate) mod block; pub(crate) mod bytes; pub(crate) mod cast; +pub(crate) mod change_feed_include; pub(crate) mod changefeed; pub(crate) mod cond; pub(crate) mod constant; diff --git a/core/src/sql/v1/value/serde/ser/changefeed/mod.rs b/core/src/sql/v1/value/serde/ser/changefeed/mod.rs index ca108d48..8a0f5835 100644 --- a/core/src/sql/v1/value/serde/ser/changefeed/mod.rs +++ b/core/src/sql/v1/value/serde/ser/changefeed/mod.rs @@ -38,6 +38,7 @@ impl ser::Serializer for Serializer { #[derive(Default)] pub struct SerializeChangeFeed { expiry: Duration, + store_original: bool, } impl serde::ser::SerializeStruct for SerializeChangeFeed { @@ -52,6 +53,9 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed { "expiry" => { self.expiry = value.serialize(ser::duration::Serializer.wrap())?; } + "store_original" => { + self.store_original = value.serialize(ser::primitive::bool::Serializer.wrap())?; + } key => { return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`"))); } @@ -62,6 +66,7 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed { fn end(self) -> Result { Ok(ChangeFeed { expiry: self.expiry, + store_original: self.store_original, }) } } diff --git a/core/src/sql/v2/arbitrary.rs b/core/src/sql/v2/arbitrary.rs index bdad2bbc..7f027f18 100644 --- a/core/src/sql/v2/arbitrary.rs +++ b/core/src/sql/v2/arbitrary.rs @@ -32,6 +32,7 @@ impl<'a> Arbitrary<'a> for ChangeFeed { fn arbitrary(u: &mut Unstructured<'a>) -> Result { Ok(Self { expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?), + store_original: bool::arbitrary(u)?, }) } } diff --git a/core/src/sql/v2/change_feed_include.rs b/core/src/sql/v2/change_feed_include.rs new file mode 100644 index 00000000..f5586a3a --- /dev/null +++ b/core/src/sql/v2/change_feed_include.rs @@ -0,0 +1,25 @@ +use revision::revisioned; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] +#[revisioned(revision = 1)] +/// ChangeFeedInclude statements are an appendix +pub enum ChangeFeedInclude { + Original, +} + +impl Default for ChangeFeedInclude { + fn default() -> Self { + Self::Original + } +} + +impl fmt::Display for ChangeFeedInclude { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(match self { + Self::Original => "Original", + }) + } +} diff --git a/core/src/sql/v2/changefeed.rs b/core/src/sql/v2/changefeed.rs index bde873dc..96c4c380 100644 --- a/core/src/sql/v2/changefeed.rs +++ b/core/src/sql/v2/changefeed.rs @@ -6,13 +6,18 @@ use std::str; use std::time; #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] -#[revisioned(revision = 1)] +#[revisioned(revision = 2)] pub struct ChangeFeed { pub expiry: time::Duration, + #[revision(start = 2)] + pub store_original: bool, } impl Display for ChangeFeed { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "CHANGEFEED {}", Duration(self.expiry))?; + if self.store_original { + write!(f, " INCLUDE ORIGINAL")?; + }; Ok(()) } } @@ -21,6 +26,7 @@ impl Default for ChangeFeed { fn default() -> Self { Self { expiry: time::Duration::from_secs(0), + store_original: false, } } } diff --git a/core/src/sql/v2/mod.rs b/core/src/sql/v2/mod.rs index 9afc9d36..776345c4 100644 --- a/core/src/sql/v2/mod.rs +++ b/core/src/sql/v2/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod base; pub(crate) mod block; pub(crate) mod bytes; pub(crate) mod cast; +pub(crate) mod change_feed_include; pub(crate) mod changefeed; pub(crate) mod cond; pub(crate) mod constant; diff --git a/core/src/sql/v2/statements/define/table.rs b/core/src/sql/v2/statements/define/table.rs index f73c3f6a..ef2fdb62 100644 --- a/core/src/sql/v2/statements/define/table.rs +++ b/core/src/sql/v2/statements/define/table.rs @@ -1,3 +1,9 @@ +use std::fmt::{self, Display, Write}; + +use derive::Store; +use revision::revisioned; +use serde::{Deserialize, Serialize}; + use crate::ctx::Context; use crate::dbs::{Options, Transaction}; use crate::doc::CursorDoc; @@ -9,10 +15,6 @@ use crate::sql::{ statements::UpdateStatement, Base, Ident, Permissions, Strand, Value, Values, View, }; -use derive::Store; -use revision::revisioned; -use serde::{Deserialize, Serialize}; -use std::fmt::{self, Display, Write}; #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] diff --git a/core/src/sql/v2/value/serde/ser/changefeed/mod.rs b/core/src/sql/v2/value/serde/ser/changefeed/mod.rs index ca108d48..8a0f5835 100644 --- a/core/src/sql/v2/value/serde/ser/changefeed/mod.rs +++ b/core/src/sql/v2/value/serde/ser/changefeed/mod.rs @@ -38,6 +38,7 @@ impl ser::Serializer for Serializer { #[derive(Default)] pub struct SerializeChangeFeed { expiry: Duration, + store_original: bool, } impl serde::ser::SerializeStruct for SerializeChangeFeed { @@ -52,6 +53,9 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed { "expiry" => { self.expiry = value.serialize(ser::duration::Serializer.wrap())?; } + "store_original" => { + self.store_original = value.serialize(ser::primitive::bool::Serializer.wrap())?; + } key => { return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`"))); } @@ -62,6 +66,7 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed { fn end(self) -> Result { Ok(ChangeFeed { expiry: self.expiry, + store_original: self.store_original, }) } } diff --git a/core/src/syn/v1/part/mod.rs b/core/src/syn/v1/part/mod.rs index ee3c3622..088614b6 100644 --- a/core/src/syn/v1/part/mod.rs +++ b/core/src/syn/v1/part/mod.rs @@ -67,10 +67,20 @@ pub fn changefeed(i: &str) -> IResult<&str, ChangeFeed> { let (i, _) = tag_no_case("CHANGEFEED")(i)?; let (i, _) = shouldbespace(i)?; let (i, v) = cut(duration)(i)?; + + let (i, store_original) = opt(|i| { + let (i, _) = shouldbespace(i)?; + let (i, _): (&str, &str) = tag_no_case("INCLUDE")(i)?; + let (i, _) = shouldbespace(i)?; + let (i, b): (&str, &str) = tag_no_case("ORIGINAL")(i)?; + Ok((i, b)) + })(i)?; + Ok(( i, ChangeFeed { expiry: v.0, + store_original: store_original.is_some(), }, )) } @@ -234,7 +244,6 @@ pub fn version(i: &str) -> IResult<&str, Version> { #[cfg(test)] mod tests { - use super::*; use crate::sql::{Datetime, Idiom, Value}; use crate::syn::Parse; @@ -256,7 +265,23 @@ mod tests { assert_eq!( out, ChangeFeed { - expiry: time::Duration::from_secs(3600) + expiry: time::Duration::from_secs(3600), + store_original: false, + } + ); + } + + #[test] + fn changefeed_include_original() { + let sql = "CHANGEFEED 1h INCLUDE ORIGINAL"; + let res = changefeed(sql); + let out = res.unwrap().1; + assert_eq!("CHANGEFEED 1h INCLUDE ORIGINAL", format!("{}", out)); + assert_eq!( + out, + ChangeFeed { + expiry: time::Duration::from_secs(3600), + store_original: true, } ); } @@ -343,7 +368,7 @@ mod tests { let out = res.unwrap().1; assert_eq!( out, - Fetchs(vec![Fetch(Idiom::parse("field")), Fetch(Idiom::parse("other.field")),]) + Fetchs(vec![Fetch(Idiom::parse("field")), Fetch(Idiom::parse("other.field"))]) ); assert_eq!("FETCH field, other.field", format!("{}", out)); } diff --git a/core/src/syn/v2/lexer/keywords.rs b/core/src/syn/v2/lexer/keywords.rs index 5d1250cf..712035f0 100644 --- a/core/src/syn/v2/lexer/keywords.rs +++ b/core/src/syn/v2/lexer/keywords.rs @@ -1,4 +1,5 @@ use crate::{ + sql::change_feed_include::ChangeFeedInclude, sql::{language::Language, Algorithm}, syn::v2::token::{DistanceKind, Keyword, TokenKind}, }; @@ -74,6 +75,7 @@ pub(crate) static KEYWORDS: phf::Map, TokenKind> = phf_map UniCase::ascii("GROUP") => TokenKind::Keyword(Keyword::Group), UniCase::ascii("HIGHLIGHTS") => TokenKind::Keyword(Keyword::Highlights), UniCase::ascii("IGNORE") => TokenKind::Keyword(Keyword::Ignore), + UniCase::ascii("INCLUDE") => TokenKind::Keyword(Keyword::Include), UniCase::ascii("INDEX") => TokenKind::Keyword(Keyword::Index), UniCase::ascii("INFO") => TokenKind::Keyword(Keyword::Info), UniCase::ascii("INSERT") => TokenKind::Keyword(Keyword::Insert), @@ -284,6 +286,9 @@ pub(crate) static KEYWORDS: phf::Map, TokenKind> = phf_map UniCase::ascii("MANHATTAN") => TokenKind::Distance(DistanceKind::Manhattan), UniCase::ascii("HAMMING") => TokenKind::Distance(DistanceKind::Hamming), UniCase::ascii("MINKOWSKI") => TokenKind::Distance(DistanceKind::Minkowski), + + // Change Feed keywords + UniCase::ascii("ORIGINAL") => TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original), }; const fn jwks_token_kind() -> TokenKind { diff --git a/core/src/syn/v2/parser/stmt/parts.rs b/core/src/syn/v2/parser/stmt/parts.rs index 86b9c2ec..32c7a296 100644 --- a/core/src/syn/v2/parser/stmt/parts.rs +++ b/core/src/syn/v2/parser/stmt/parts.rs @@ -1,5 +1,6 @@ //! Contains parsing code for smaller common parts of statements. +use crate::sql::change_feed_include::ChangeFeedInclude; use crate::{ sql::{ changefeed::ChangeFeed, index::Distance, Base, Cond, Data, Duration, Fetch, Fetchs, Field, @@ -331,8 +332,16 @@ impl Parser<'_> { /// Expects the parser to have already eating the `CHANGEFEED` keyword pub fn parse_changefeed(&mut self) -> ParseResult { let expiry = self.next_token_value::()?.0; + let store_original = if self.eat(t!("INCLUDE")) { + expected!(self, TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original)); + true + } else { + false + }; + Ok(ChangeFeed { expiry, + store_original, }) } diff --git a/core/src/syn/v2/parser/test/stmt.rs b/core/src/syn/v2/parser/test/stmt.rs index 606ab3ec..0c41a1ea 100644 --- a/core/src/syn/v2/parser/test/stmt.rs +++ b/core/src/syn/v2/parser/test/stmt.rs @@ -138,7 +138,9 @@ fn parse_define_namespace() { #[test] fn parse_define_database() { - let res = test_parse!(parse_stmt, "DEFINE DATABASE a COMMENT 'test' CHANGEFEED 10m").unwrap(); + let res = + test_parse!(parse_stmt, "DEFINE DATABASE a COMMENT 'test' CHANGEFEED 10m INCLUDE ORIGINAL") + .unwrap(); assert_eq!( res, Statement::Define(DefineStatement::Database(DefineDatabaseStatement { @@ -146,7 +148,8 @@ fn parse_define_database() { name: Ident("a".to_string()), comment: Some(Strand("test".to_string())), changefeed: Some(ChangeFeed { - expiry: std::time::Duration::from_secs(60) * 10 + expiry: std::time::Duration::from_secs(60) * 10, + store_original: true, }) })) ); @@ -277,7 +280,7 @@ fn parse_define_param() { #[test] fn parse_define_table() { let res = - test_parse!(parse_stmt, r#"DEFINE TABLE name DROP SCHEMAFUL CHANGEFEED 1s PERMISSIONS FOR SELECT WHERE a = 1 AS SELECT foo FROM bar GROUP BY foo"#) + test_parse!(parse_stmt, r#"DEFINE TABLE name DROP SCHEMAFUL CHANGEFEED 1s INCLUDE ORIGINAL PERMISSIONS FOR SELECT WHERE a = 1 AS SELECT foo FROM bar GROUP BY foo"#) .unwrap(); assert_eq!( @@ -312,7 +315,8 @@ fn parse_define_table() { delete: Permission::None, }, changefeed: Some(ChangeFeed { - expiry: std::time::Duration::from_secs(1) + expiry: std::time::Duration::from_secs(1), + store_original: true, }), comment: None, })) diff --git a/core/src/syn/v2/parser/test/streaming.rs b/core/src/syn/v2/parser/test/streaming.rs index c9cc87ab..466b5d47 100644 --- a/core/src/syn/v2/parser/test/streaming.rs +++ b/core/src/syn/v2/parser/test/streaming.rs @@ -163,6 +163,7 @@ fn statements() -> Vec { comment: Some(Strand("test".to_string())), changefeed: Some(ChangeFeed { expiry: std::time::Duration::from_secs(60) * 10, + store_original: false, }), })), Statement::Define(DefineStatement::Database(DefineDatabaseStatement { @@ -235,6 +236,7 @@ fn statements() -> Vec { }, changefeed: Some(ChangeFeed { expiry: std::time::Duration::from_secs(1), + store_original: false, }), comment: None, })), diff --git a/core/src/syn/v2/token/keyword.rs b/core/src/syn/v2/token/keyword.rs index f1edc8da..f25ae10c 100644 --- a/core/src/syn/v2/token/keyword.rs +++ b/core/src/syn/v2/token/keyword.rs @@ -83,6 +83,7 @@ keyword! { Group => "GROUP", Highlights => "HIGHLIGHTS", Ignore => "IGNORE", + Include => "INCLUDE", Index => "INDEX", Info => "INFO", Insert => "INSERT", diff --git a/core/src/syn/v2/token/mod.rs b/core/src/syn/v2/token/mod.rs index dc9cd9b3..ce1c77d5 100644 --- a/core/src/syn/v2/token/mod.rs +++ b/core/src/syn/v2/token/mod.rs @@ -8,6 +8,7 @@ pub use keyword::Keyword; mod mac; pub(crate) use mac::t; +use crate::sql::change_feed_include::ChangeFeedInclude; use crate::sql::{language::Language, Algorithm}; /// A location in the source passed to the lexer. @@ -224,6 +225,7 @@ pub enum NumberKind { pub enum TokenKind { Keyword(Keyword), Algorithm(Algorithm), + ChangeFeedInclude(ChangeFeedInclude), Language(Language), Distance(DistanceKind), Operator(Operator), @@ -383,6 +385,7 @@ impl TokenKind { TokenKind::At => "@", TokenKind::Invalid => "Invalid", TokenKind::Eof => "Eof", + TokenKind::ChangeFeedInclude(_) => "change feed include", } } }