Replace tokio::time with wasmtimer for Wasm targets (#1846)

This commit is contained in:
Rushmore Mushambi 2023-04-23 09:32:27 +02:00 committed by GitHub
parent ee24f8d6a0
commit cd16d4af5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 73 additions and 21 deletions

15
Cargo.lock generated
View file

@ -3832,7 +3832,6 @@ dependencies = [
"tikv-client",
"time 0.3.20",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util",
"tracing",
@ -3841,6 +3840,7 @@ dependencies = [
"url",
"uuid",
"wasm-bindgen-futures",
"wasmtimer",
"ws_stream_wasm",
]
@ -4705,6 +4705,19 @@ dependencies = [
"web-sys",
]
[[package]]
name = "wasmtimer"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf"
dependencies = [
"futures 0.3.28",
"js-sys",
"parking_lot 0.12.1",
"pin-utils",
"wasm-bindgen",
]
[[package]]
name = "web-sys"
version = "0.3.61"

View file

@ -32,11 +32,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1680762089,
"narHash": "sha256-62lgi+xb+nn9H4O+ZIYNkHeQ8ryzstALKMJuoXiot0I=",
"lastModified": 1682144464,
"narHash": "sha256-HlVJU4p1OED3HJNOoXrxR6qabKWMtGq0wbYhroumuVc=",
"owner": "nix-community",
"repo": "fenix",
"rev": "5794e58068fb6a8eccad9e4ff77ffe1c08ded13c",
"rev": "6424d70f13761c203dada9de6ce417fc9f22712d",
"type": "github"
},
"original": {
@ -110,11 +110,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1680665430,
"narHash": "sha256-MTVhTukwza1Jlq2gECITZPFnhROmylP2uv3O3cSqQCE=",
"lastModified": 1682110011,
"narHash": "sha256-J1ArhCRJov3Ycflq7QcmpOzeqqYj39AjlcH77cUx/pQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5233fd2ba76a3accb5aaa999c00509a11fd0793c",
"rev": "624f65a2b164bc9fe47324606940ffe773196813",
"type": "github"
},
"original": {
@ -136,11 +136,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1680727375,
"narHash": "sha256-hb8AosuONAg0D9yoZ4VrBsjf5hINMYVLPEGekXF4qVE=",
"lastModified": 1682101542,
"narHash": "sha256-CHSbpvLZf0joKD1cU+Hg02uIYvV3xkvwcx+0oBWL0CQ=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "ea22d245b671f97b820cf761108251c6292c3152",
"rev": "af3b6a0893cc3a05b5ddc1e9d31b2c454b480426",
"type": "github"
},
"original": {

View file

@ -19,7 +19,7 @@ resolver = "2"
# Public features
default = ["protocol-ws", "rustls"]
protocol-http = ["dep:reqwest", "dep:tokio-util"]
protocol-ws = ["dep:tokio-tungstenite", "dep:tokio-stream", "tokio/time"]
protocol-ws = ["dep:tokio-tungstenite", "tokio/time"]
kv-mem = ["dep:echodb", "tokio/time"]
kv-indxdb = ["dep:indxdb"]
kv-rocksdb = ["dep:rocksdb", "tokio/time"]
@ -95,7 +95,6 @@ sha2 = "0.10.6"
storekey = "0.4.1"
thiserror = "1.0.40"
tikv = { version = "0.1.0", package = "tikv-client", optional = true }
tokio-stream = { version = "0.1.12", optional = true }
tokio-util = { version = "0.7.7", optional = true, features = ["compat"] }
tracing = "0.1.37"
trice = "0.3.1"
@ -112,10 +111,11 @@ tokio = { version = "1.27.0", features = ["macros", "rt", "rt-multi-thread"] }
pharos = "0.5.3"
tokio = { version = "1.27.0", default-features = false, features = ["rt"] }
uuid = { version = "1.3.1", features = ["serde", "js", "v4", "v7"] }
wasmtimer = { version = "0.2.0", default-features = false, features = ["tokio"] }
wasm-bindgen-futures = "0.4.34"
ws_stream_wasm = "0.7.4"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.27.0", default-features = false, features = ["io-util", "io-std", "fs", "rt-multi-thread"] }
tokio = { version = "1.27.0", default-features = false, features = ["macros", "io-util", "io-std", "fs", "rt-multi-thread"] }
tokio-tungstenite = { version = "0.18.0", optional = true }
uuid = { version = "1.3.1", features = ["serde", "v4", "v7"] }

View file

@ -16,10 +16,22 @@ use crate::api::Surreal;
use crate::opt::IntoEndpoint;
use crate::sql::Array;
use crate::sql::Value;
use futures::Stream;
use serde::Deserialize;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Interval;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::Interval;
pub(crate) const PATH: &str = "rpc";
const PING_INTERVAL: Duration = Duration::from_secs(5);
@ -168,3 +180,23 @@ pub(crate) struct Response {
#[serde(flatten)]
pub(crate) content: Content,
}
struct IntervalStream {
inner: Interval,
}
impl IntervalStream {
fn new(interval: Interval) -> Self {
Self {
inner: interval,
}
}
}
impl Stream for IntervalStream {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
self.inner.poll_tick(cx).map(Some)
}
}

View file

@ -19,6 +19,7 @@ use crate::api::ExtraFeatures;
use crate::api::Response as QueryResponse;
use crate::api::Result;
use crate::api::Surreal;
use crate::engine::remote::ws::IntervalStream;
use crate::sql::Strand;
use crate::sql::Value;
use flume::Receiver;
@ -43,7 +44,6 @@ use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time;
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::error::Error as WsError;
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_tungstenite::tungstenite::Message;

View file

@ -17,6 +17,7 @@ use crate::api::ExtraFeatures;
use crate::api::Response as QueryResponse;
use crate::api::Result;
use crate::api::Surreal;
use crate::engine::remote::ws::IntervalStream;
use crate::sql::Strand;
use crate::sql::Value;
use flume::Receiver;
@ -39,11 +40,11 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use tokio::time;
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::IntervalStream;
use std::time::Duration;
use trice::Instant;
use wasm_bindgen_futures::spawn_local;
use wasmtimer::tokio as time;
use wasmtimer::tokio::MissedTickBehavior;
use ws_stream_wasm::WsEvent;
use ws_stream_wasm::WsMessage as Message;
use ws_stream_wasm::WsMeta;
@ -358,7 +359,7 @@ pub(crate) fn router(
Ok(events) => events,
Err(error) => {
trace!(target: LOG, "{error}");
time::sleep(time::Duration::from_secs(1)).await;
time::sleep(Duration::from_secs(1)).await;
continue 'reconnect;
}
}
@ -366,7 +367,7 @@ pub(crate) fn router(
for (_, message) in &replay {
if let Err(error) = socket.send(message.clone()).await {
trace!(target: LOG, "{error}");
time::sleep(time::Duration::from_secs(1)).await;
time::sleep(Duration::from_secs(1)).await;
continue 'reconnect;
}
}
@ -381,7 +382,7 @@ pub(crate) fn router(
trace!(target: LOG, "Request {payload}");
if let Err(error) = socket.send(Message::Binary(payload.into())).await {
trace!(target: LOG, "{error}");
time::sleep(time::Duration::from_secs(1)).await;
time::sleep(Duration::from_secs(1)).await;
continue 'reconnect;
}
}
@ -390,7 +391,7 @@ pub(crate) fn router(
}
Err(error) => {
trace!(target: LOG, "Failed to reconnect; {error}");
time::sleep(time::Duration::from_secs(1)).await;
time::sleep(Duration::from_secs(1)).await;
}
}
}

View file

@ -11,6 +11,9 @@ pub async fn sleep(ctx: &Context<'_>, (dur,): (Duration,)) -> Result<Value, Erro
(_, d) => d,
};
// Sleep for the specified time
#[cfg(target_arch = "wasm32")]
wasmtimer::tokio::sleep(dur).await;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(dur).await;
// Ok all good
Ok(Value::None)

View file

@ -35,6 +35,9 @@ impl SleepStatement {
(_, d) => d,
};
// Sleep for the specified time
#[cfg(target_arch = "wasm32")]
wasmtimer::tokio::sleep(dur).await;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(dur).await;
// Ok all good
Ok(Value::None)