feat: SHOW statement for change feed reading (#2187)
This commit is contained in:
parent
781b1f944e
commit
d1cf55764d
4 changed files with 221 additions and 2 deletions
|
@ -14,12 +14,14 @@ use crate::sql::statements::insert::InsertStatement;
|
||||||
use crate::sql::statements::live::LiveStatement;
|
use crate::sql::statements::live::LiveStatement;
|
||||||
use crate::sql::statements::relate::RelateStatement;
|
use crate::sql::statements::relate::RelateStatement;
|
||||||
use crate::sql::statements::select::SelectStatement;
|
use crate::sql::statements::select::SelectStatement;
|
||||||
|
use crate::sql::statements::show::ShowStatement;
|
||||||
use crate::sql::statements::update::UpdateStatement;
|
use crate::sql::statements::update::UpdateStatement;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub(crate) enum Statement<'a> {
|
pub(crate) enum Statement<'a> {
|
||||||
Live(&'a LiveStatement),
|
Live(&'a LiveStatement),
|
||||||
|
Show(&'a ShowStatement),
|
||||||
Select(&'a SelectStatement),
|
Select(&'a SelectStatement),
|
||||||
Create(&'a CreateStatement),
|
Create(&'a CreateStatement),
|
||||||
Update(&'a UpdateStatement),
|
Update(&'a UpdateStatement),
|
||||||
|
@ -34,6 +36,12 @@ impl<'a> From<&'a LiveStatement> for Statement<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a> From<&'a ShowStatement> for Statement<'a> {
|
||||||
|
fn from(v: &'a ShowStatement) -> Self {
|
||||||
|
Statement::Show(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a> From<&'a SelectStatement> for Statement<'a> {
|
impl<'a> From<&'a SelectStatement> for Statement<'a> {
|
||||||
fn from(v: &'a SelectStatement) -> Self {
|
fn from(v: &'a SelectStatement) -> Self {
|
||||||
Statement::Select(v)
|
Statement::Select(v)
|
||||||
|
@ -74,6 +82,7 @@ impl<'a> fmt::Display for Statement<'a> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
Statement::Live(v) => write!(f, "{v}"),
|
Statement::Live(v) => write!(f, "{v}"),
|
||||||
|
Statement::Show(v) => write!(f, "{v}"),
|
||||||
Statement::Select(v) => write!(f, "{v}"),
|
Statement::Select(v) => write!(f, "{v}"),
|
||||||
Statement::Create(v) => write!(f, "{v}"),
|
Statement::Create(v) => write!(f, "{v}"),
|
||||||
Statement::Update(v) => write!(f, "{v}"),
|
Statement::Update(v) => write!(f, "{v}"),
|
||||||
|
|
|
@ -24,6 +24,7 @@ use crate::sql::statements::relate::{relate, RelateStatement};
|
||||||
use crate::sql::statements::remove::{remove, RemoveStatement};
|
use crate::sql::statements::remove::{remove, RemoveStatement};
|
||||||
use crate::sql::statements::select::{select, SelectStatement};
|
use crate::sql::statements::select::{select, SelectStatement};
|
||||||
use crate::sql::statements::set::{set, SetStatement};
|
use crate::sql::statements::set::{set, SetStatement};
|
||||||
|
use crate::sql::statements::show::{show, ShowStatement};
|
||||||
use crate::sql::statements::sleep::{sleep, SleepStatement};
|
use crate::sql::statements::sleep::{sleep, SleepStatement};
|
||||||
use crate::sql::statements::update::{update, UpdateStatement};
|
use crate::sql::statements::update::{update, UpdateStatement};
|
||||||
use crate::sql::statements::yuse::{yuse, UseStatement};
|
use crate::sql::statements::yuse::{yuse, UseStatement};
|
||||||
|
@ -92,6 +93,7 @@ pub enum Statement {
|
||||||
Remove(RemoveStatement),
|
Remove(RemoveStatement),
|
||||||
Select(SelectStatement),
|
Select(SelectStatement),
|
||||||
Set(SetStatement),
|
Set(SetStatement),
|
||||||
|
Show(ShowStatement),
|
||||||
Sleep(SleepStatement),
|
Sleep(SleepStatement),
|
||||||
Update(UpdateStatement),
|
Update(UpdateStatement),
|
||||||
Use(UseStatement),
|
Use(UseStatement),
|
||||||
|
@ -128,6 +130,7 @@ impl Statement {
|
||||||
Self::Remove(_) => true,
|
Self::Remove(_) => true,
|
||||||
Self::Select(v) => v.writeable(),
|
Self::Select(v) => v.writeable(),
|
||||||
Self::Set(v) => v.writeable(),
|
Self::Set(v) => v.writeable(),
|
||||||
|
Self::Show(_) => false,
|
||||||
Self::Sleep(_) => false,
|
Self::Sleep(_) => false,
|
||||||
Self::Update(v) => v.writeable(),
|
Self::Update(v) => v.writeable(),
|
||||||
Self::Use(_) => false,
|
Self::Use(_) => false,
|
||||||
|
@ -151,6 +154,7 @@ impl Statement {
|
||||||
Self::Remove(v) => v.compute(ctx, opt).await,
|
Self::Remove(v) => v.compute(ctx, opt).await,
|
||||||
Self::Select(v) => v.compute(ctx, opt).await,
|
Self::Select(v) => v.compute(ctx, opt).await,
|
||||||
Self::Set(v) => v.compute(ctx, opt).await,
|
Self::Set(v) => v.compute(ctx, opt).await,
|
||||||
|
Self::Show(v) => v.compute(ctx, opt).await,
|
||||||
Self::Sleep(v) => v.compute(ctx, opt).await,
|
Self::Sleep(v) => v.compute(ctx, opt).await,
|
||||||
Self::Update(v) => v.compute(ctx, opt).await,
|
Self::Update(v) => v.compute(ctx, opt).await,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
|
@ -179,6 +183,7 @@ impl Display for Statement {
|
||||||
Self::Remove(v) => write!(Pretty::from(f), "{v}"),
|
Self::Remove(v) => write!(Pretty::from(f), "{v}"),
|
||||||
Self::Select(v) => write!(Pretty::from(f), "{v}"),
|
Self::Select(v) => write!(Pretty::from(f), "{v}"),
|
||||||
Self::Set(v) => write!(Pretty::from(f), "{v}"),
|
Self::Set(v) => write!(Pretty::from(f), "{v}"),
|
||||||
|
Self::Show(v) => write!(Pretty::from(f), "{v}"),
|
||||||
Self::Sleep(v) => write!(Pretty::from(f), "{v}"),
|
Self::Sleep(v) => write!(Pretty::from(f), "{v}"),
|
||||||
Self::Update(v) => write!(Pretty::from(f), "{v}"),
|
Self::Update(v) => write!(Pretty::from(f), "{v}"),
|
||||||
Self::Use(v) => write!(Pretty::from(f), "{v}"),
|
Self::Use(v) => write!(Pretty::from(f), "{v}"),
|
||||||
|
@ -208,9 +213,9 @@ pub fn statement(i: &str) -> IResult<&str, Statement> {
|
||||||
map(remove, Statement::Remove),
|
map(remove, Statement::Remove),
|
||||||
map(select, Statement::Select),
|
map(select, Statement::Select),
|
||||||
map(set, Statement::Set),
|
map(set, Statement::Set),
|
||||||
|
map(show, Statement::Show),
|
||||||
map(sleep, Statement::Sleep),
|
map(sleep, Statement::Sleep),
|
||||||
map(update, Statement::Update),
|
alt((map(update, Statement::Update), map(yuse, Statement::Use))),
|
||||||
map(yuse, Statement::Use),
|
|
||||||
)),
|
)),
|
||||||
mightbespace,
|
mightbespace,
|
||||||
)(i)
|
)(i)
|
||||||
|
@ -247,4 +252,22 @@ mod tests {
|
||||||
let out = res.unwrap().1;
|
let out = res.unwrap().1;
|
||||||
assert_eq!("CREATE test;\nCREATE temp;", format!("{}", out))
|
assert_eq!("CREATE test;\nCREATE temp;", format!("{}", out))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_table_changes() {
|
||||||
|
let sql = "SHOW CHANGES FOR TABLE test SINCE 123456";
|
||||||
|
let res = statement(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!("SHOW CHANGES FOR TABLE test SINCE 123456", format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_database_changes() {
|
||||||
|
let sql = "SHOW CHANGES FOR DATABASE SINCE 123456";
|
||||||
|
let res = statement(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!("SHOW CHANGES FOR DATABASE SINCE 123456", format!("{}", out))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ pub(crate) mod relate;
|
||||||
pub(crate) mod remove;
|
pub(crate) mod remove;
|
||||||
pub(crate) mod select;
|
pub(crate) mod select;
|
||||||
pub(crate) mod set;
|
pub(crate) mod set;
|
||||||
|
pub(crate) mod show;
|
||||||
pub(crate) mod sleep;
|
pub(crate) mod sleep;
|
||||||
pub(crate) mod update;
|
pub(crate) mod update;
|
||||||
pub(crate) mod yuse;
|
pub(crate) mod yuse;
|
||||||
|
|
186
lib/src/sql/statements/show.rs
Normal file
186
lib/src/sql/statements/show.rs
Normal file
|
@ -0,0 +1,186 @@
|
||||||
|
use crate::ctx::Context;
|
||||||
|
use crate::dbs::Options;
|
||||||
|
use crate::err::Error;
|
||||||
|
use crate::sql::comment::shouldbespace;
|
||||||
|
use crate::sql::common::take_u64;
|
||||||
|
use crate::sql::error::IResult;
|
||||||
|
use crate::sql::table::{table, Table};
|
||||||
|
use crate::sql::value::Value;
|
||||||
|
use derive::Store;
|
||||||
|
use nom::branch::alt;
|
||||||
|
use nom::bytes::complete::tag_no_case;
|
||||||
|
use nom::character::complete::u32;
|
||||||
|
use nom::combinator::map;
|
||||||
|
use nom::combinator::opt;
|
||||||
|
use nom::sequence::preceded;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
// ShowStatement is used to show changes in a table or database via
|
||||||
|
// the SHOW CHANGES statement.
|
||||||
|
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Store, Hash)]
|
||||||
|
pub struct ShowStatement {
|
||||||
|
pub table: Option<Table>,
|
||||||
|
pub since: Option<u64>,
|
||||||
|
pub limit: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShowStatement {
|
||||||
|
/// Process this type returning a computed simple Value
|
||||||
|
pub(crate) async fn compute(&self, _ctx: &Context<'_>, _opt: &Options) -> Result<Value, Error> {
|
||||||
|
Err(Error::FeatureNotYetImplemented {
|
||||||
|
feature: "change feed",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for ShowStatement {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "SHOW CHANGES FOR")?;
|
||||||
|
match self.table {
|
||||||
|
Some(ref v) => write!(f, " TABLE {}", v)?,
|
||||||
|
None => write!(f, " DATABASE")?,
|
||||||
|
}
|
||||||
|
if let Some(ref v) = self.since {
|
||||||
|
write!(f, " SINCE {}", v)?
|
||||||
|
}
|
||||||
|
if let Some(ref v) = self.limit {
|
||||||
|
write!(f, " LIMIT {}", v)?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn table_or_database(i: &str) -> IResult<&str, Option<Table>> {
|
||||||
|
let (i, v) = alt((
|
||||||
|
map(preceded(tag_no_case("table"), preceded(shouldbespace, table)), Some),
|
||||||
|
map(tag_no_case("database"), |_| None),
|
||||||
|
))(i)?;
|
||||||
|
Ok((i, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn since(i: &str) -> IResult<&str, u64> {
|
||||||
|
let (i, _) = tag_no_case("SINCE")(i)?;
|
||||||
|
let (i, _) = shouldbespace(i)?;
|
||||||
|
|
||||||
|
take_u64(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn limit(i: &str) -> IResult<&str, u32> {
|
||||||
|
let (i, _) = tag_no_case("LIMIT")(i)?;
|
||||||
|
let (i, _) = shouldbespace(i)?;
|
||||||
|
|
||||||
|
u32(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn show(i: &str) -> IResult<&str, ShowStatement> {
|
||||||
|
let (i, _) = tag_no_case("SHOW CHANGES")(i)?;
|
||||||
|
let (i, _) = shouldbespace(i)?;
|
||||||
|
let (i, _) = tag_no_case("FOR")(i)?;
|
||||||
|
let (i, _) = shouldbespace(i)?;
|
||||||
|
let (i, table) = table_or_database(i)?;
|
||||||
|
let (i, since) = opt(preceded(shouldbespace, since))(i)?;
|
||||||
|
let (i, limit) = opt(preceded(shouldbespace, limit))(i)?;
|
||||||
|
Ok((
|
||||||
|
i,
|
||||||
|
ShowStatement {
|
||||||
|
table,
|
||||||
|
since,
|
||||||
|
limit,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tb() {
|
||||||
|
let sql = "TABLE person";
|
||||||
|
let res = table_or_database(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1.unwrap();
|
||||||
|
assert_eq!("person", format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn db() {
|
||||||
|
let sql = "DATABASE";
|
||||||
|
let res = table_or_database(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
assert!(res.unwrap().1.is_none())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_table_changes() {
|
||||||
|
let sql = "SHOW CHANGES FOR TABLE person";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_table_changes_since() {
|
||||||
|
let sql = "SHOW CHANGES FOR TABLE person SINCE 0";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_table_changes_limit() {
|
||||||
|
let sql = "SHOW CHANGES FOR TABLE person LIMIT 10";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_table_changes_since_limit() {
|
||||||
|
let sql = "SHOW CHANGES FOR TABLE person SINCE 0 LIMIT 10";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_database_changes() {
|
||||||
|
let sql = "SHOW CHANGES FOR DATABASE";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_database_changes_since() {
|
||||||
|
let sql = "SHOW CHANGES FOR DATABASE SINCE 0";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_database_changes_limit() {
|
||||||
|
let sql = "SHOW CHANGES FOR DATABASE LIMIT 10";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn show_database_changes_since_limit() {
|
||||||
|
let sql = "SHOW CHANGES FOR DATABASE SINCE 0 LIMIT 10";
|
||||||
|
let res = show(sql);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
let out = res.unwrap().1;
|
||||||
|
assert_eq!(sql, format!("{}", out))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue