Add syntax for change feeds to store original value alongside change (#3567)

Co-authored-by: Mees Delzenne <DelSkayn@users.noreply.github.com>
This commit is contained in:
Przemyslaw Hugh Kaznowski 2024-02-28 19:35:39 +00:00 committed by GitHub
parent fba5aa875d
commit 99600d50ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 142 additions and 13 deletions

View file

@ -150,6 +150,7 @@ mod tests {
name: crate::sql::Ident(db.to_string()), name: crate::sql::Ident(db.to_string()),
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10), expiry: Duration::from_secs(10),
store_original: false,
}), }),
..Default::default() ..Default::default()
}; };
@ -157,6 +158,7 @@ mod tests {
name: tb.into(), name: tb.into(),
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10), expiry: Duration::from_secs(10),
store_original: false,
}), }),
..Default::default() ..Default::default()
}; };

View file

@ -32,6 +32,7 @@ impl<'a> Arbitrary<'a> for ChangeFeed {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> { fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> {
Ok(Self { Ok(Self {
expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?), expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?),
store_original: bool::arbitrary(u)?,
}) })
} }
} }

View file

@ -0,0 +1,25 @@
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[revisioned(revision = 1)]
/// ChangeFeedInclude statements are an appendix
pub enum ChangeFeedInclude {
Original,
}
impl Default for crate::sql::change_feed_include::ChangeFeedInclude {
fn default() -> Self {
Self::Original
}
}
impl fmt::Display for crate::sql::change_feed_include::ChangeFeedInclude {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match self {
Self::Original => "Original",
})
}
}

View file

@ -6,14 +6,19 @@ use std::str;
use std::time; use std::time;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[revisioned(revision = 1)] #[revisioned(revision = 2)]
pub struct ChangeFeed { pub struct ChangeFeed {
pub expiry: time::Duration, pub expiry: time::Duration,
#[revision(start = 2)]
pub store_original: bool,
} }
impl Display for ChangeFeed { impl Display for ChangeFeed {
fn fmt(&self, f: &mut Formatter) -> fmt::Result { fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "CHANGEFEED {}", Duration(self.expiry))?; write!(f, "CHANGEFEED {}", Duration(self.expiry))?;
if self.store_original {
write!(f, " INCLUDE ORIGINAL")?;
};
Ok(()) Ok(())
} }
} }
@ -22,6 +27,7 @@ impl Default for ChangeFeed {
fn default() -> Self { fn default() -> Self {
Self { Self {
expiry: time::Duration::from_secs(0), expiry: time::Duration::from_secs(0),
store_original: false,
} }
} }
} }

View file

@ -8,6 +8,7 @@ pub(crate) mod base;
pub(crate) mod block; pub(crate) mod block;
pub(crate) mod bytes; pub(crate) mod bytes;
pub(crate) mod cast; pub(crate) mod cast;
pub(crate) mod change_feed_include;
pub(crate) mod changefeed; pub(crate) mod changefeed;
pub(crate) mod cond; pub(crate) mod cond;
pub(crate) mod constant; pub(crate) mod constant;

View file

@ -38,6 +38,7 @@ impl ser::Serializer for Serializer {
#[derive(Default)] #[derive(Default)]
pub struct SerializeChangeFeed { pub struct SerializeChangeFeed {
expiry: Duration, expiry: Duration,
store_original: bool,
} }
impl serde::ser::SerializeStruct for SerializeChangeFeed { impl serde::ser::SerializeStruct for SerializeChangeFeed {
@ -52,6 +53,9 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
"expiry" => { "expiry" => {
self.expiry = value.serialize(ser::duration::Serializer.wrap())?; self.expiry = value.serialize(ser::duration::Serializer.wrap())?;
} }
"store_original" => {
self.store_original = value.serialize(ser::primitive::bool::Serializer.wrap())?;
}
key => { key => {
return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`"))); return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`")));
} }
@ -62,6 +66,7 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
fn end(self) -> Result<Self::Ok, Error> { fn end(self) -> Result<Self::Ok, Error> {
Ok(ChangeFeed { Ok(ChangeFeed {
expiry: self.expiry, expiry: self.expiry,
store_original: self.store_original,
}) })
} }
} }

View file

@ -32,6 +32,7 @@ impl<'a> Arbitrary<'a> for ChangeFeed {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> { fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> {
Ok(Self { Ok(Self {
expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?), expiry: time::Duration::new(u64::arbitrary(u)?, u32::arbitrary(u)?),
store_original: bool::arbitrary(u)?,
}) })
} }
} }

View file

@ -0,0 +1,25 @@
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[revisioned(revision = 1)]
/// ChangeFeedInclude statements are an appendix
pub enum ChangeFeedInclude {
Original,
}
impl Default for ChangeFeedInclude {
fn default() -> Self {
Self::Original
}
}
impl fmt::Display for ChangeFeedInclude {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match self {
Self::Original => "Original",
})
}
}

View file

@ -6,13 +6,18 @@ use std::str;
use std::time; use std::time;
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] #[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
#[revisioned(revision = 1)] #[revisioned(revision = 2)]
pub struct ChangeFeed { pub struct ChangeFeed {
pub expiry: time::Duration, pub expiry: time::Duration,
#[revision(start = 2)]
pub store_original: bool,
} }
impl Display for ChangeFeed { impl Display for ChangeFeed {
fn fmt(&self, f: &mut Formatter) -> fmt::Result { fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "CHANGEFEED {}", Duration(self.expiry))?; write!(f, "CHANGEFEED {}", Duration(self.expiry))?;
if self.store_original {
write!(f, " INCLUDE ORIGINAL")?;
};
Ok(()) Ok(())
} }
} }
@ -21,6 +26,7 @@ impl Default for ChangeFeed {
fn default() -> Self { fn default() -> Self {
Self { Self {
expiry: time::Duration::from_secs(0), expiry: time::Duration::from_secs(0),
store_original: false,
} }
} }
} }

View file

@ -8,6 +8,7 @@ pub(crate) mod base;
pub(crate) mod block; pub(crate) mod block;
pub(crate) mod bytes; pub(crate) mod bytes;
pub(crate) mod cast; pub(crate) mod cast;
pub(crate) mod change_feed_include;
pub(crate) mod changefeed; pub(crate) mod changefeed;
pub(crate) mod cond; pub(crate) mod cond;
pub(crate) mod constant; pub(crate) mod constant;

View file

@ -1,3 +1,9 @@
use std::fmt::{self, Display, Write};
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use crate::ctx::Context; use crate::ctx::Context;
use crate::dbs::{Options, Transaction}; use crate::dbs::{Options, Transaction};
use crate::doc::CursorDoc; use crate::doc::CursorDoc;
@ -9,10 +15,6 @@ use crate::sql::{
statements::UpdateStatement, statements::UpdateStatement,
Base, Ident, Permissions, Strand, Value, Values, View, Base, Ident, Permissions, Strand, Value, Values, View,
}; };
use derive::Store;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display, Write};
#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)] #[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]

View file

@ -38,6 +38,7 @@ impl ser::Serializer for Serializer {
#[derive(Default)] #[derive(Default)]
pub struct SerializeChangeFeed { pub struct SerializeChangeFeed {
expiry: Duration, expiry: Duration,
store_original: bool,
} }
impl serde::ser::SerializeStruct for SerializeChangeFeed { impl serde::ser::SerializeStruct for SerializeChangeFeed {
@ -52,6 +53,9 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
"expiry" => { "expiry" => {
self.expiry = value.serialize(ser::duration::Serializer.wrap())?; self.expiry = value.serialize(ser::duration::Serializer.wrap())?;
} }
"store_original" => {
self.store_original = value.serialize(ser::primitive::bool::Serializer.wrap())?;
}
key => { key => {
return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`"))); return Err(Error::custom(format!("unexpected field `ChangeFeed::{key}`")));
} }
@ -62,6 +66,7 @@ impl serde::ser::SerializeStruct for SerializeChangeFeed {
fn end(self) -> Result<Self::Ok, Error> { fn end(self) -> Result<Self::Ok, Error> {
Ok(ChangeFeed { Ok(ChangeFeed {
expiry: self.expiry, expiry: self.expiry,
store_original: self.store_original,
}) })
} }
} }

View file

@ -67,10 +67,20 @@ pub fn changefeed(i: &str) -> IResult<&str, ChangeFeed> {
let (i, _) = tag_no_case("CHANGEFEED")(i)?; let (i, _) = tag_no_case("CHANGEFEED")(i)?;
let (i, _) = shouldbespace(i)?; let (i, _) = shouldbespace(i)?;
let (i, v) = cut(duration)(i)?; let (i, v) = cut(duration)(i)?;
let (i, store_original) = opt(|i| {
let (i, _) = shouldbespace(i)?;
let (i, _): (&str, &str) = tag_no_case("INCLUDE")(i)?;
let (i, _) = shouldbespace(i)?;
let (i, b): (&str, &str) = tag_no_case("ORIGINAL")(i)?;
Ok((i, b))
})(i)?;
Ok(( Ok((
i, i,
ChangeFeed { ChangeFeed {
expiry: v.0, expiry: v.0,
store_original: store_original.is_some(),
}, },
)) ))
} }
@ -234,7 +244,6 @@ pub fn version(i: &str) -> IResult<&str, Version> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::sql::{Datetime, Idiom, Value}; use crate::sql::{Datetime, Idiom, Value};
use crate::syn::Parse; use crate::syn::Parse;
@ -256,7 +265,23 @@ mod tests {
assert_eq!( assert_eq!(
out, out,
ChangeFeed { ChangeFeed {
expiry: time::Duration::from_secs(3600) expiry: time::Duration::from_secs(3600),
store_original: false,
}
);
}
#[test]
fn changefeed_include_original() {
let sql = "CHANGEFEED 1h INCLUDE ORIGINAL";
let res = changefeed(sql);
let out = res.unwrap().1;
assert_eq!("CHANGEFEED 1h INCLUDE ORIGINAL", format!("{}", out));
assert_eq!(
out,
ChangeFeed {
expiry: time::Duration::from_secs(3600),
store_original: true,
} }
); );
} }
@ -343,7 +368,7 @@ mod tests {
let out = res.unwrap().1; let out = res.unwrap().1;
assert_eq!( assert_eq!(
out, out,
Fetchs(vec![Fetch(Idiom::parse("field")), Fetch(Idiom::parse("other.field")),]) Fetchs(vec![Fetch(Idiom::parse("field")), Fetch(Idiom::parse("other.field"))])
); );
assert_eq!("FETCH field, other.field", format!("{}", out)); assert_eq!("FETCH field, other.field", format!("{}", out));
} }

View file

@ -1,4 +1,5 @@
use crate::{ use crate::{
sql::change_feed_include::ChangeFeedInclude,
sql::{language::Language, Algorithm}, sql::{language::Language, Algorithm},
syn::v2::token::{DistanceKind, Keyword, TokenKind}, syn::v2::token::{DistanceKind, Keyword, TokenKind},
}; };
@ -74,6 +75,7 @@ pub(crate) static KEYWORDS: phf::Map<UniCase<&'static str>, TokenKind> = phf_map
UniCase::ascii("GROUP") => TokenKind::Keyword(Keyword::Group), UniCase::ascii("GROUP") => TokenKind::Keyword(Keyword::Group),
UniCase::ascii("HIGHLIGHTS") => TokenKind::Keyword(Keyword::Highlights), UniCase::ascii("HIGHLIGHTS") => TokenKind::Keyword(Keyword::Highlights),
UniCase::ascii("IGNORE") => TokenKind::Keyword(Keyword::Ignore), UniCase::ascii("IGNORE") => TokenKind::Keyword(Keyword::Ignore),
UniCase::ascii("INCLUDE") => TokenKind::Keyword(Keyword::Include),
UniCase::ascii("INDEX") => TokenKind::Keyword(Keyword::Index), UniCase::ascii("INDEX") => TokenKind::Keyword(Keyword::Index),
UniCase::ascii("INFO") => TokenKind::Keyword(Keyword::Info), UniCase::ascii("INFO") => TokenKind::Keyword(Keyword::Info),
UniCase::ascii("INSERT") => TokenKind::Keyword(Keyword::Insert), UniCase::ascii("INSERT") => TokenKind::Keyword(Keyword::Insert),
@ -284,6 +286,9 @@ pub(crate) static KEYWORDS: phf::Map<UniCase<&'static str>, TokenKind> = phf_map
UniCase::ascii("MANHATTAN") => TokenKind::Distance(DistanceKind::Manhattan), UniCase::ascii("MANHATTAN") => TokenKind::Distance(DistanceKind::Manhattan),
UniCase::ascii("HAMMING") => TokenKind::Distance(DistanceKind::Hamming), UniCase::ascii("HAMMING") => TokenKind::Distance(DistanceKind::Hamming),
UniCase::ascii("MINKOWSKI") => TokenKind::Distance(DistanceKind::Minkowski), UniCase::ascii("MINKOWSKI") => TokenKind::Distance(DistanceKind::Minkowski),
// Change Feed keywords
UniCase::ascii("ORIGINAL") => TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original),
}; };
const fn jwks_token_kind() -> TokenKind { const fn jwks_token_kind() -> TokenKind {

View file

@ -1,5 +1,6 @@
//! Contains parsing code for smaller common parts of statements. //! Contains parsing code for smaller common parts of statements.
use crate::sql::change_feed_include::ChangeFeedInclude;
use crate::{ use crate::{
sql::{ sql::{
changefeed::ChangeFeed, index::Distance, Base, Cond, Data, Duration, Fetch, Fetchs, Field, changefeed::ChangeFeed, index::Distance, Base, Cond, Data, Duration, Fetch, Fetchs, Field,
@ -331,8 +332,16 @@ impl Parser<'_> {
/// Expects the parser to have already eating the `CHANGEFEED` keyword /// Expects the parser to have already eating the `CHANGEFEED` keyword
pub fn parse_changefeed(&mut self) -> ParseResult<ChangeFeed> { pub fn parse_changefeed(&mut self) -> ParseResult<ChangeFeed> {
let expiry = self.next_token_value::<Duration>()?.0; let expiry = self.next_token_value::<Duration>()?.0;
let store_original = if self.eat(t!("INCLUDE")) {
expected!(self, TokenKind::ChangeFeedInclude(ChangeFeedInclude::Original));
true
} else {
false
};
Ok(ChangeFeed { Ok(ChangeFeed {
expiry, expiry,
store_original,
}) })
} }

View file

@ -138,7 +138,9 @@ fn parse_define_namespace() {
#[test] #[test]
fn parse_define_database() { fn parse_define_database() {
let res = test_parse!(parse_stmt, "DEFINE DATABASE a COMMENT 'test' CHANGEFEED 10m").unwrap(); let res =
test_parse!(parse_stmt, "DEFINE DATABASE a COMMENT 'test' CHANGEFEED 10m INCLUDE ORIGINAL")
.unwrap();
assert_eq!( assert_eq!(
res, res,
Statement::Define(DefineStatement::Database(DefineDatabaseStatement { Statement::Define(DefineStatement::Database(DefineDatabaseStatement {
@ -146,7 +148,8 @@ fn parse_define_database() {
name: Ident("a".to_string()), name: Ident("a".to_string()),
comment: Some(Strand("test".to_string())), comment: Some(Strand("test".to_string())),
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(60) * 10 expiry: std::time::Duration::from_secs(60) * 10,
store_original: true,
}) })
})) }))
); );
@ -277,7 +280,7 @@ fn parse_define_param() {
#[test] #[test]
fn parse_define_table() { fn parse_define_table() {
let res = let res =
test_parse!(parse_stmt, r#"DEFINE TABLE name DROP SCHEMAFUL CHANGEFEED 1s PERMISSIONS FOR SELECT WHERE a = 1 AS SELECT foo FROM bar GROUP BY foo"#) test_parse!(parse_stmt, r#"DEFINE TABLE name DROP SCHEMAFUL CHANGEFEED 1s INCLUDE ORIGINAL PERMISSIONS FOR SELECT WHERE a = 1 AS SELECT foo FROM bar GROUP BY foo"#)
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@ -312,7 +315,8 @@ fn parse_define_table() {
delete: Permission::None, delete: Permission::None,
}, },
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(1) expiry: std::time::Duration::from_secs(1),
store_original: true,
}), }),
comment: None, comment: None,
})) }))

View file

@ -163,6 +163,7 @@ fn statements() -> Vec<Statement> {
comment: Some(Strand("test".to_string())), comment: Some(Strand("test".to_string())),
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(60) * 10, expiry: std::time::Duration::from_secs(60) * 10,
store_original: false,
}), }),
})), })),
Statement::Define(DefineStatement::Database(DefineDatabaseStatement { Statement::Define(DefineStatement::Database(DefineDatabaseStatement {
@ -235,6 +236,7 @@ fn statements() -> Vec<Statement> {
}, },
changefeed: Some(ChangeFeed { changefeed: Some(ChangeFeed {
expiry: std::time::Duration::from_secs(1), expiry: std::time::Duration::from_secs(1),
store_original: false,
}), }),
comment: None, comment: None,
})), })),

View file

@ -83,6 +83,7 @@ keyword! {
Group => "GROUP", Group => "GROUP",
Highlights => "HIGHLIGHTS", Highlights => "HIGHLIGHTS",
Ignore => "IGNORE", Ignore => "IGNORE",
Include => "INCLUDE",
Index => "INDEX", Index => "INDEX",
Info => "INFO", Info => "INFO",
Insert => "INSERT", Insert => "INSERT",

View file

@ -8,6 +8,7 @@ pub use keyword::Keyword;
mod mac; mod mac;
pub(crate) use mac::t; pub(crate) use mac::t;
use crate::sql::change_feed_include::ChangeFeedInclude;
use crate::sql::{language::Language, Algorithm}; use crate::sql::{language::Language, Algorithm};
/// A location in the source passed to the lexer. /// A location in the source passed to the lexer.
@ -224,6 +225,7 @@ pub enum NumberKind {
pub enum TokenKind { pub enum TokenKind {
Keyword(Keyword), Keyword(Keyword),
Algorithm(Algorithm), Algorithm(Algorithm),
ChangeFeedInclude(ChangeFeedInclude),
Language(Language), Language(Language),
Distance(DistanceKind), Distance(DistanceKind),
Operator(Operator), Operator(Operator),
@ -383,6 +385,7 @@ impl TokenKind {
TokenKind::At => "@", TokenKind::At => "@",
TokenKind::Invalid => "Invalid", TokenKind::Invalid => "Invalid",
TokenKind::Eof => "Eof", TokenKind::Eof => "Eof",
TokenKind::ChangeFeedInclude(_) => "change feed include",
} }
} }
} }