Fix API warnings (#3715)
This commit is contained in:
parent
47a1589018
commit
0a466cd39d
5 changed files with 44 additions and 17 deletions
|
@ -90,6 +90,7 @@ use std::mem;
|
|||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
#[cfg(feature = "sql2")]
|
||||
use std::time::Duration;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::fs::OpenOptions;
|
||||
|
@ -100,6 +101,7 @@ use tokio::io::AsyncReadExt;
|
|||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
#[cfg(feature = "sql2")]
|
||||
const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// In-memory database
|
||||
|
|
|
@ -16,6 +16,7 @@ use crate::iam::Level;
|
|||
use crate::kvs::Datastore;
|
||||
use crate::opt::auth::Root;
|
||||
use crate::opt::WaitFor;
|
||||
#[cfg(feature = "sql2")]
|
||||
use crate::options::EngineOptions;
|
||||
use flume::Receiver;
|
||||
use flume::Sender;
|
||||
|
@ -162,9 +163,11 @@ pub(crate) fn router(
|
|||
..Default::default()
|
||||
}
|
||||
};
|
||||
#[cfg(not(feature = "sql2"))]
|
||||
let opt = EngineOptions::default();
|
||||
let (tasks, task_chans) = start_tasks(&opt, kvs.clone());
|
||||
let (tasks, task_chans) = start_tasks(
|
||||
#[cfg(feature = "sql2")]
|
||||
&opt,
|
||||
kvs.clone(),
|
||||
);
|
||||
|
||||
let mut notifications = kvs.notifications();
|
||||
let notification_stream = poll_fn(move |cx| match &mut notifications {
|
||||
|
|
|
@ -11,7 +11,6 @@ use crate::api::ExtraFeatures;
|
|||
use crate::api::OnceLockExt;
|
||||
use crate::api::Result;
|
||||
use crate::api::Surreal;
|
||||
use crate::dbs::Options;
|
||||
use crate::dbs::Session;
|
||||
use crate::engine::tasks::start_tasks;
|
||||
use crate::iam::Level;
|
||||
|
@ -153,7 +152,7 @@ pub(crate) fn router(
|
|||
tick_interval,
|
||||
..Default::default()
|
||||
};
|
||||
let (tasks, task_chans) = start_tasks(&opt, kvs.clone());
|
||||
let (_tasks, task_chans) = start_tasks(&opt, kvs.clone());
|
||||
|
||||
let mut notifications = kvs.notifications();
|
||||
let notification_stream = poll_fn(move |cx| match &mut notifications {
|
||||
|
|
|
@ -12,6 +12,7 @@ use tokio::task::JoinHandle;
|
|||
use crate::dbs::Options;
|
||||
use crate::fflags::FFLAGS;
|
||||
use crate::kvs::Datastore;
|
||||
#[cfg(feature = "sql2")]
|
||||
use crate::options::EngineOptions;
|
||||
|
||||
use crate::engine::IntervalStream;
|
||||
|
@ -39,23 +40,21 @@ impl Tasks {
|
|||
self.nd.await.map_err(|e| {
|
||||
error!("Node agent task failed: {}", e);
|
||||
#[cfg(not(feature = "sql2"))]
|
||||
let inner_err = surrealdb_core1::err::Error::Unreachable(
|
||||
let inner_err = crate::err::Error::Unreachable(
|
||||
"This feature won't go live with sql1, so delete this branching",
|
||||
);
|
||||
#[cfg(feature = "sql2")]
|
||||
let inner_err = surrealdb_core2::err::Error::NodeAgent("node task failed and has been logged");
|
||||
let inner_err = crate::err::Error::NodeAgent("node task failed and has been logged");
|
||||
RootError::Db(inner_err)
|
||||
})?;
|
||||
self.lq.await.map_err(|e| {
|
||||
error!("Live query task failed: {}", e);
|
||||
#[cfg(not(feature = "sql2"))]
|
||||
let inner_err = surrealdb_core1::err::Error::Unreachable(
|
||||
let inner_err = crate::err::Error::Unreachable(
|
||||
"This feature won't go live with sql1, so delete this branching",
|
||||
);
|
||||
#[cfg(feature = "sql2")]
|
||||
let inner_err = surrealdb_core2::err::Error::NodeAgent(
|
||||
"live query task failed and has been logged",
|
||||
);
|
||||
let inner_err = crate::err::Error::NodeAgent("live query task failed and has been logged");
|
||||
RootError::Db(inner_err)
|
||||
})?;
|
||||
Ok(())
|
||||
|
@ -63,9 +62,20 @@ impl Tasks {
|
|||
}
|
||||
|
||||
/// Starts tasks that are required for the correct running of the engine
|
||||
pub fn start_tasks(opt: &EngineOptions, dbs: Arc<Datastore>) -> (Tasks, [Sender<()>; 2]) {
|
||||
let nd = init(opt, dbs.clone());
|
||||
let lq = live_query_change_feed(opt, dbs);
|
||||
pub fn start_tasks(
|
||||
#[cfg(feature = "sql2")] opt: &EngineOptions,
|
||||
dbs: Arc<Datastore>,
|
||||
) -> (Tasks, [Sender<()>; 2]) {
|
||||
let nd = init(
|
||||
#[cfg(feature = "sql2")]
|
||||
opt,
|
||||
dbs.clone(),
|
||||
);
|
||||
let lq = live_query_change_feed(
|
||||
#[cfg(feature = "sql2")]
|
||||
opt,
|
||||
dbs,
|
||||
);
|
||||
let cancellation_channels = [nd.1, lq.1];
|
||||
(
|
||||
Tasks {
|
||||
|
@ -82,7 +92,10 @@ pub fn start_tasks(opt: &EngineOptions, dbs: Arc<Datastore>) -> (Tasks, [Sender<
|
|||
//
|
||||
// This function needs to be called before after the dbs::init and before the net::init functions.
|
||||
// It needs to be before net::init because the net::init function blocks until the web server stops.
|
||||
fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) {
|
||||
fn init(
|
||||
#[cfg(feature = "sql2")] opt: &EngineOptions,
|
||||
dbs: Arc<Datastore>,
|
||||
) -> (FutureTask, Sender<()>) {
|
||||
#[cfg(feature = "sql2")]
|
||||
let _init = crate::dbs::LoggingLifecycle::new("node agent initialisation".to_string());
|
||||
#[cfg(feature = "sql2")]
|
||||
|
@ -129,7 +142,10 @@ fn init(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) {
|
|||
}
|
||||
|
||||
// Start live query on change feeds notification processing
|
||||
fn live_query_change_feed(opt: &EngineOptions, dbs: Arc<Datastore>) -> (FutureTask, Sender<()>) {
|
||||
fn live_query_change_feed(
|
||||
#[cfg(feature = "sql2")] opt: &EngineOptions,
|
||||
dbs: Arc<Datastore>,
|
||||
) -> (FutureTask, Sender<()>) {
|
||||
#[cfg(feature = "sql2")]
|
||||
let tick_interval = opt.tick_interval;
|
||||
#[cfg(not(feature = "sql2"))]
|
||||
|
@ -196,14 +212,20 @@ async fn interval_ticker(interval: Duration) -> IntervalStream {
|
|||
mod test {
|
||||
use crate::engine::tasks::start_tasks;
|
||||
use crate::kvs::Datastore;
|
||||
#[cfg(feature = "sql2")]
|
||||
use crate::options::EngineOptions;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
pub async fn tasks_complete() {
|
||||
#[cfg(feature = "sql2")]
|
||||
let opt = EngineOptions::default();
|
||||
let dbs = Arc::new(Datastore::new("memory").await.unwrap());
|
||||
let (val, chans) = start_tasks(&opt, dbs.clone());
|
||||
let (val, chans) = start_tasks(
|
||||
#[cfg(feature = "sql2")]
|
||||
&opt,
|
||||
dbs.clone(),
|
||||
);
|
||||
for chan in chans {
|
||||
chan.send(()).unwrap();
|
||||
}
|
||||
|
|
|
@ -184,6 +184,7 @@ pub async fn init(
|
|||
dbs::init(dbs).await?;
|
||||
// Start the node agent
|
||||
let (tasks, task_chans) = start_tasks(
|
||||
#[cfg(feature = "sql2")]
|
||||
&config::CF.get().unwrap().engine.unwrap_or_default(),
|
||||
DB.get().unwrap().clone(),
|
||||
);
|
||||
|
|
Loading…
Reference in a new issue