Allow exports larger than 10,240 bytes for local engines (#1833)

This commit is contained in:
Finn Bear 2023-04-20 03:42:23 -07:00 committed by GitHub
parent 8eac52315b
commit 46c15cfb8e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -58,6 +58,8 @@ use std::collections::BTreeMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem; use std::mem;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::PathBuf;
#[cfg(not(target_arch = "wasm32"))]
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio::io; use tokio::io;
@ -450,18 +452,36 @@ async fn router(
let ns = session.ns.clone().unwrap_or_default(); let ns = session.ns.clone().unwrap_or_default();
let db = session.db.clone().unwrap_or_default(); let db = session.db.clone().unwrap_or_default();
let (mut writer, mut reader) = io::duplex(10_240); let (mut writer, mut reader) = io::duplex(10_240);
tokio::spawn(async move {
// Write to channel.
async fn export_with_err(
kvs: &Datastore,
ns: String,
db: String,
chn: channel::Sender<Vec<u8>>,
) -> std::result::Result<(), crate::Error> {
kvs.export(ns, db, chn).await.map_err(|error| {
error!(target: LOG, "{error}");
crate::Error::Db(error)
})
}
let export = export_with_err(kvs, ns, db, tx);
// Read from channel and write to pipe.
let bridge = async move {
while let Ok(value) = rx.recv().await { while let Ok(value) = rx.recv().await {
if let Err(error) = writer.write_all(&value).await { if writer.write_all(&value).await.is_err() {
error!(target: LOG, "{error}"); // Broken pipe. Let either side's error be propagated.
break;
} }
} }
}); Ok(())
if let Err(error) = kvs.export(ns, db, tx).await { };
error!(target: LOG, "{error}");
} // Output to stdout or file.
let path = param.file.expect("file to export into"); let path = param.file.expect("file to export into");
let mut writer: Box<dyn AsyncWrite + Unpin + Send> = match path.to_str().unwrap() { let mut output: Box<dyn AsyncWrite + Unpin + Send> = match path.to_str().unwrap() {
"-" => Box::new(io::stdout()), "-" => Box::new(io::stdout()),
_ => { _ => {
let file = match OpenOptions::new() let file = match OpenOptions::new()
@ -483,14 +503,28 @@ async fn router(
Box::new(file) Box::new(file)
} }
}; };
if let Err(error) = io::copy(&mut reader, &mut writer).await {
return Err(Error::FileRead { // Copy from pipe to output.
path, async fn copy_with_err<'a, R, W>(
error, path: PathBuf,
} reader: &'a mut R,
.into()); writer: &'a mut W,
}; ) -> std::result::Result<(), crate::Error>
Ok(DbResponse::Other(Value::None)) where
R: tokio::io::AsyncRead + Unpin + ?Sized,
W: tokio::io::AsyncWrite + Unpin + ?Sized,
{
io::copy(reader, writer).await.map(|_| ()).map_err(|error| {
crate::Error::Api(crate::error::Api::FileRead {
path,
error,
})
})
}
let copy = copy_with_err(path, &mut reader, &mut output);
tokio::try_join!(export, bridge, copy).map(|_| DbResponse::Other(Value::None))
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
Method::Import => { Method::Import => {