2023-07-24 16:15:20 +00:00
mod parse ;
2023-08-25 07:55:22 +00:00
use chrono ::DateTime ;
2023-07-24 16:15:20 +00:00
use parse ::Parse ;
2023-08-30 18:01:30 +00:00
mod helpers ;
use helpers ::new_ds ;
2023-07-24 16:15:20 +00:00
use surrealdb ::dbs ::Session ;
use surrealdb ::err ::Error ;
use surrealdb ::sql ::Value ;
2023-09-12 14:57:27 +00:00
#[ tokio::test ]
async fn database_change_feeds ( ) -> Result < ( ) , Error > {
let sql = "
DEFINE DATABASE test CHANGEFEED 1 h ;
DEFINE TABLE person ;
DEFINE FIELD name ON TABLE person
ASSERT
IF $input THEN
$input = / ^ [ A - Z ] { 1 } [ a - z ] + $ /
ELSE
true
END
VALUE
IF $input THEN
' Name : ' + $input
ELSE
$value
END
;
UPDATE person :test CONTENT { name : ' Tobie ' } ;
DELETE person :test ;
SHOW CHANGES FOR TABLE person SINCE 0 ;
" ;
let dbs = new_ds ( ) . await ? ;
let ses = Session ::owner ( ) . with_ns ( " test " ) . with_db ( " test " ) ;
let start_ts = 0 u64 ;
let end_ts = start_ts + 1 ;
dbs . tick_at ( start_ts ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-09-12 14:57:27 +00:00
dbs . tick_at ( end_ts ) . await ? ;
assert_eq! ( res . len ( ) , 6 ) ;
// DEFINE DATABASE
let tmp = res . remove ( 0 ) . result ;
assert! ( tmp . is_ok ( ) ) ;
// DEFINE TABLE
let tmp = res . remove ( 0 ) . result ;
assert! ( tmp . is_ok ( ) ) ;
// DEFINE FIELD
let tmp = res . remove ( 0 ) . result ;
assert! ( tmp . is_ok ( ) ) ;
// UPDATE CONTENT
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
id : person :test ,
name : ' Name : Tobie ' ,
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
// DELETE
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse ( " [] " ) ;
assert_eq! ( tmp , val ) ;
// SHOW CHANGES
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
versionstamp : 65536 ,
changes : [
{
update : {
id : person :test ,
name : ' Name : Tobie '
}
}
]
} ,
{
versionstamp : 131072 ,
changes : [
{
delete : {
id : person :test
}
}
]
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
// Retain for 1h
let sql = "
SHOW CHANGES FOR TABLE person SINCE 0 ;
" ;
dbs . tick_at ( end_ts + 3599 ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-09-12 14:57:27 +00:00
let tmp = res . remove ( 0 ) . result ? ;
assert_eq! ( tmp , val ) ;
// GC after 1hs
dbs . tick_at ( end_ts + 3600 ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-09-12 14:57:27 +00:00
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse ( " [] " ) ;
assert_eq! ( tmp , val ) ;
//
Ok ( ( ) )
}
2023-07-24 16:15:20 +00:00
#[ tokio::test ]
async fn table_change_feeds ( ) -> Result < ( ) , Error > {
let sql = "
DEFINE TABLE person CHANGEFEED 1 h ;
DEFINE FIELD name ON TABLE person
ASSERT
IF $input THEN
$input = / ^ [ A - Z ] { 1 } [ a - z ] + $ /
ELSE
true
END
VALUE
IF $input THEN
' Name : ' + $input
ELSE
$value
END
;
UPDATE person :test CONTENT { name : ' Tobie ' } ;
UPDATE person :test REPLACE { name : ' jaime ' } ;
UPDATE person :test MERGE { name : ' Jaime ' } ;
UPDATE person :test SET name = ' tobie ' ;
UPDATE person :test SET name = ' Tobie ' ;
DELETE person :test ;
CREATE person :1000 SET name = ' Yusuke ' ;
SHOW CHANGES FOR TABLE person SINCE 0 ;
" ;
2023-08-30 18:01:30 +00:00
let dbs = new_ds ( ) . await ? ;
2023-07-29 18:47:25 +00:00
let ses = Session ::owner ( ) . with_ns ( " test " ) . with_db ( " test " ) ;
2023-07-29 08:51:30 +00:00
let start_ts = 0 u64 ;
let end_ts = start_ts + 1 ;
dbs . tick_at ( start_ts ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-07-29 08:51:30 +00:00
dbs . tick_at ( end_ts ) . await ? ;
2023-07-24 16:15:20 +00:00
assert_eq! ( res . len ( ) , 10 ) ;
// DEFINE TABLE
let tmp = res . remove ( 0 ) . result ;
assert! ( tmp . is_ok ( ) ) ;
// DEFINE FIELD
let tmp = res . remove ( 0 ) . result ;
assert! ( tmp . is_ok ( ) ) ;
// UPDATE CONTENT
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
id : person :test ,
name : ' Name : Tobie ' ,
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
// UPDATE REPLACE
let tmp = res . remove ( 0 ) . result ;
assert! ( matches! (
tmp . err ( ) ,
Some ( e ) if e . to_string ( ) = = r # "Found 'Name: jaime' for field `name`, with record `person:test`, but field must conform to: IF $input THEN $input = /^[A-Z]{1}[a-z]+$/ ELSE true END"#
) ) ;
// UPDATE MERGE
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
id : person :test ,
name : ' Name : Jaime ' ,
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
// UPDATE SET
let tmp = res . remove ( 0 ) . result ;
assert! ( matches! (
tmp . err ( ) ,
Some ( e ) if e . to_string ( ) = = r # "Found 'Name: tobie' for field `name`, with record `person:test`, but field must conform to: IF $input THEN $input = /^[A-Z]{1}[a-z]+$/ ELSE true END"#
) ) ;
// UPDATE SET
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
id : person :test ,
name : ' Name : Tobie ' ,
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
// DELETE
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse ( " [] " ) ;
assert_eq! ( tmp , val ) ;
// CREATE
let _tmp = res . remove ( 0 ) . result ? ;
// SHOW CHANGES
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse (
" [
{
versionstamp : 65536 ,
2023-09-12 15:55:07 +00:00
changes : [
{
define_table : {
name : ' person '
}
}
]
} ,
{
versionstamp : 131072 ,
2023-07-24 16:15:20 +00:00
changes : [
{
update : {
id : person :test ,
name : ' Name : Tobie '
}
}
]
} ,
{
2023-09-12 15:55:07 +00:00
versionstamp : 196608 ,
2023-07-24 16:15:20 +00:00
changes : [
{
update : {
id : person :test ,
name : ' Name : Jaime '
}
}
]
} ,
{
2023-09-12 15:55:07 +00:00
versionstamp : 262144 ,
2023-07-24 16:15:20 +00:00
changes : [
{
update : {
id : person :test ,
name : ' Name : Tobie '
}
}
]
} ,
{
2023-09-12 15:55:07 +00:00
versionstamp : 327680 ,
2023-07-24 16:15:20 +00:00
changes : [
{
delete : {
id : person :test
}
}
]
} ,
{
2023-09-12 15:55:07 +00:00
versionstamp : 393216 ,
2023-07-24 16:15:20 +00:00
changes : [
{
update : {
id : person :1000 ,
name : ' Name : Yusuke '
}
}
]
}
] " ,
) ;
assert_eq! ( tmp , val ) ;
2023-07-29 08:51:30 +00:00
// Retain for 1h
let sql = "
SHOW CHANGES FOR TABLE person SINCE 0 ;
" ;
dbs . tick_at ( end_ts + 3599 ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-07-29 08:51:30 +00:00
let tmp = res . remove ( 0 ) . result ? ;
assert_eq! ( tmp , val ) ;
// GC after 1hs
dbs . tick_at ( end_ts + 3600 ) . await ? ;
2023-10-04 09:51:34 +00:00
let res = & mut dbs . execute ( sql , & ses , None ) . await ? ;
2023-07-29 08:51:30 +00:00
let tmp = res . remove ( 0 ) . result ? ;
let val = Value ::parse ( " [] " ) ;
assert_eq! ( tmp , val ) ;
2023-07-24 16:15:20 +00:00
//
Ok ( ( ) )
}
2023-08-25 07:55:22 +00:00
#[ tokio::test ]
async fn changefeed_with_ts ( ) -> Result < ( ) , Error > {
2023-08-30 18:01:30 +00:00
let db = new_ds ( ) . await ? ;
2023-08-25 07:55:22 +00:00
let ses = Session ::owner ( ) . with_ns ( " test " ) . with_db ( " test " ) ;
// Enable change feeds
let sql = "
DEFINE TABLE user CHANGEFEED 1 h ;
" ;
db . execute ( sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
// Save timestamp 1
let ts1_dt = " 2023-08-01T00:00:00Z " ;
2023-09-04 12:12:24 +00:00
let ts1 = DateTime ::parse_from_rfc3339 ( ts1_dt ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
db . tick_at ( ts1 . timestamp ( ) . try_into ( ) . unwrap ( ) ) . await . unwrap ( ) ;
// Create and update users
let sql = "
CREATE user :amos SET name = ' Amos ' ;
CREATE user :jane SET name = ' Jane ' ;
UPDATE user :amos SET name = ' AMOS ' ;
" ;
let table = " user " ;
let res = db . execute ( sql , & ses , None ) . await ? ;
for res in res {
res . result ? ;
}
let sql = format! ( " UPDATE {table} SET name = 'Doe' " ) ;
let users = db . execute ( & sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
let expected = Value ::parse (
" [
{
id : user :amos ,
name : ' Doe ' ,
} ,
{
id : user :jane ,
name : ' Doe ' ,
} ,
] " ,
) ;
assert_eq! ( users , expected ) ;
let sql = format! ( " SELECT * FROM {table} " ) ;
let users = db . execute ( & sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
assert_eq! ( users , expected ) ;
let sql = "
SHOW CHANGES FOR TABLE user SINCE 0 LIMIT 10 ;
" ;
let value : Value = db . execute ( sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
let Value ::Array ( array ) = value . clone ( ) else {
unreachable! ( )
} ;
2023-09-12 15:55:07 +00:00
assert_eq! ( array . len ( ) , 5 ) ;
// DEFINE TABLE
2023-08-25 07:55:22 +00:00
let a = array . get ( 0 ) . unwrap ( ) ;
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
2023-10-04 09:51:34 +00:00
let Value ::Number ( _versionstamp1 ) = a . get ( " versionstamp " ) . unwrap ( ) else {
2023-08-25 07:55:22 +00:00
unreachable! ( )
} ;
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
2023-09-12 15:55:07 +00:00
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
define_table : {
name : ' user '
}
}
] "
)
. unwrap ( )
) ;
// UPDATE user:amos
let a = array . get ( 1 ) . unwrap ( ) ;
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
let Value ::Number ( versionstamp2 ) = a . get ( " versionstamp " ) . unwrap ( ) else {
2023-09-12 15:55:07 +00:00
unreachable! ( )
} ;
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
2023-08-25 07:55:22 +00:00
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
update : {
id : user :amos ,
name : ' Amos '
}
}
] "
)
. unwrap ( )
) ;
// UPDATE user:jane
2023-09-12 15:55:07 +00:00
let a = array . get ( 2 ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
let Value ::Number ( versionstamp3 ) = a . get ( " versionstamp " ) . unwrap ( ) else {
2023-08-25 07:55:22 +00:00
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
assert! ( versionstamp2 < versionstamp3 ) ;
2023-08-25 07:55:22 +00:00
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
update : {
id : user :jane ,
name : ' Jane '
}
}
] "
)
. unwrap ( )
) ;
// UPDATE user:amos
2023-09-12 15:55:07 +00:00
let a = array . get ( 3 ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
let Value ::Number ( versionstamp4 ) = a . get ( " versionstamp " ) . unwrap ( ) else {
2023-08-25 07:55:22 +00:00
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
assert! ( versionstamp3 < versionstamp4 ) ;
2023-08-25 07:55:22 +00:00
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
update : {
id : user :amos ,
name : ' AMOS '
}
}
] "
)
. unwrap ( )
) ;
// UPDATE table
2023-09-12 15:55:07 +00:00
let a = array . get ( 4 ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
let Value ::Number ( versionstamp5 ) = a . get ( " versionstamp " ) . unwrap ( ) else {
2023-08-25 07:55:22 +00:00
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
assert! ( versionstamp4 < versionstamp5 ) ;
2023-08-25 07:55:22 +00:00
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
update : {
id : user :amos ,
name : ' Doe '
}
} ,
{
update : {
id : user :jane ,
name : ' Doe '
}
}
] "
)
. unwrap ( )
) ;
// Save timestamp 2
let ts2_dt = " 2023-08-01T00:00:05Z " ;
2023-09-04 12:12:24 +00:00
let ts2 = DateTime ::parse_from_rfc3339 ( ts2_dt ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
db . tick_at ( ts2 . timestamp ( ) . try_into ( ) . unwrap ( ) ) . await . unwrap ( ) ;
//
// Show changes using timestamp 1
//
2024-01-10 16:43:56 +00:00
let sql = format! ( " SHOW CHANGES FOR TABLE user SINCE d' {ts1_dt} ' LIMIT 10; " ) ;
2023-08-25 07:55:22 +00:00
let value : Value = db . execute ( & sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
let Value ::Array ( array ) = value . clone ( ) else {
unreachable! ( )
} ;
assert_eq! ( array . len ( ) , 4 ) ;
// UPDATE user:amos
let a = array . get ( 0 ) . unwrap ( ) ;
let Value ::Object ( a ) = a else {
unreachable! ( )
} ;
let Value ::Number ( versionstamp1b ) = a . get ( " versionstamp " ) . unwrap ( ) else {
unreachable! ( )
} ;
2023-09-12 20:26:03 +00:00
assert! ( versionstamp2 = = versionstamp1b ) ;
2023-08-25 07:55:22 +00:00
let changes = a . get ( " changes " ) . unwrap ( ) . to_owned ( ) ;
assert_eq! (
changes ,
surrealdb ::sql ::value (
" [
{
update : {
id : user :amos ,
name : ' Amos '
}
}
] "
)
. unwrap ( )
) ;
// Save timestamp 3
let ts3_dt = " 2023-08-01T00:00:10Z " ;
2023-09-04 12:12:24 +00:00
let ts3 = DateTime ::parse_from_rfc3339 ( ts3_dt ) . unwrap ( ) ;
2023-08-25 07:55:22 +00:00
db . tick_at ( ts3 . timestamp ( ) . try_into ( ) . unwrap ( ) ) . await . unwrap ( ) ;
//
// Show changes using timestamp 3
//
2024-01-10 16:43:56 +00:00
let sql = format! ( " SHOW CHANGES FOR TABLE user SINCE d' {ts3_dt} ' LIMIT 10; " ) ;
2023-08-25 07:55:22 +00:00
let value : Value = db . execute ( & sql , & ses , None ) . await ? . remove ( 0 ) . result ? ;
let Value ::Array ( array ) = value . clone ( ) else {
unreachable! ( )
} ;
assert_eq! ( array . len ( ) , 0 ) ;
Ok ( ( ) )
}