diff --git a/lib/src/api/engine/local/mod.rs b/lib/src/api/engine/local/mod.rs index ba8661ed..95914f95 100644 --- a/lib/src/api/engine/local/mod.rs +++ b/lib/src/api/engine/local/mod.rs @@ -58,6 +58,8 @@ use std::collections::BTreeMap; use std::marker::PhantomData; use std::mem; #[cfg(not(target_arch = "wasm32"))] +use std::path::PathBuf; +#[cfg(not(target_arch = "wasm32"))] use tokio::fs::OpenOptions; #[cfg(not(target_arch = "wasm32"))] use tokio::io; @@ -450,18 +452,36 @@ async fn router( let ns = session.ns.clone().unwrap_or_default(); let db = session.db.clone().unwrap_or_default(); let (mut writer, mut reader) = io::duplex(10_240); - tokio::spawn(async move { + + // Write to channel. + async fn export_with_err( + kvs: &Datastore, + ns: String, + db: String, + chn: channel::Sender>, + ) -> std::result::Result<(), crate::Error> { + kvs.export(ns, db, chn).await.map_err(|error| { + error!(target: LOG, "{error}"); + crate::Error::Db(error) + }) + } + + let export = export_with_err(kvs, ns, db, tx); + + // Read from channel and write to pipe. + let bridge = async move { while let Ok(value) = rx.recv().await { - if let Err(error) = writer.write_all(&value).await { - error!(target: LOG, "{error}"); + if writer.write_all(&value).await.is_err() { + // Broken pipe. Let either side's error be propagated. + break; } } - }); - if let Err(error) = kvs.export(ns, db, tx).await { - error!(target: LOG, "{error}"); - } + Ok(()) + }; + + // Output to stdout or file. let path = param.file.expect("file to export into"); - let mut writer: Box = match path.to_str().unwrap() { + let mut output: Box = match path.to_str().unwrap() { "-" => Box::new(io::stdout()), _ => { let file = match OpenOptions::new() @@ -483,14 +503,28 @@ async fn router( Box::new(file) } }; - if let Err(error) = io::copy(&mut reader, &mut writer).await { - return Err(Error::FileRead { - path, - error, - } - .into()); - }; - Ok(DbResponse::Other(Value::None)) + + // Copy from pipe to output. + async fn copy_with_err<'a, R, W>( + path: PathBuf, + reader: &'a mut R, + writer: &'a mut W, + ) -> std::result::Result<(), crate::Error> + where + R: tokio::io::AsyncRead + Unpin + ?Sized, + W: tokio::io::AsyncWrite + Unpin + ?Sized, + { + io::copy(reader, writer).await.map(|_| ()).map_err(|error| { + crate::Error::Api(crate::error::Api::FileRead { + path, + error, + }) + }) + } + + let copy = copy_with_err(path, &mut reader, &mut output); + + tokio::try_join!(export, bridge, copy).map(|_| DbResponse::Other(Value::None)) } #[cfg(not(target_arch = "wasm32"))] Method::Import => {