Switch SDK to use revision binary format ()

This commit is contained in:
Raphael Darley 2024-09-11 18:19:37 +01:00 committed by GitHub
parent d32110b46d
commit a920ed4d83
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 52 additions and 83 deletions
sdk/src/api
engine/remote/ws
mod.rs
opt/endpoint

View file

@ -24,7 +24,6 @@ use uuid::Uuid;
pub(crate) const PATH: &str = "rpc"; pub(crate) const PATH: &str = "rpc";
const PING_INTERVAL: Duration = Duration::from_secs(5); const PING_INTERVAL: Duration = Duration::from_secs(5);
const REVISION_HEADER: &str = "revision"; const REVISION_HEADER: &str = "revision";
const BINCODE_HEADER: &str = "bincode";
enum RequestEffect { enum RequestEffect {
/// Completing this request sets a variable to a give value. /// Completing this request sets a variable to a give value.

View file

@ -76,15 +76,9 @@ pub(crate) async fn connect(
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> { ) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let mut request = (&endpoint.url).into_client_request()?; let mut request = (&endpoint.url).into_client_request()?;
if endpoint.supports_revision { request
request .headers_mut()
.headers_mut() .insert(SEC_WEBSOCKET_PROTOCOL, HeaderValue::from_static(super::REVISION_HEADER));
.insert(SEC_WEBSOCKET_PROTOCOL, HeaderValue::from_static(super::REVISION_HEADER));
} else {
request
.headers_mut()
.insert(SEC_WEBSOCKET_PROTOCOL, HeaderValue::from_static(super::BINCODE_HEADER));
}
#[cfg(any(feature = "native-tls", feature = "rustls"))] #[cfg(any(feature = "native-tls", feature = "rustls"))]
let (socket, _) = tokio_tungstenite::connect_async_tls_with_config( let (socket, _) = tokio_tungstenite::connect_async_tls_with_config(
@ -152,7 +146,6 @@ async fn router_handle_route(
response, response,
}: Route, }: Route,
state: &mut RouterState, state: &mut RouterState,
endpoint: &Endpoint,
) -> HandleResult { ) -> HandleResult {
let RequestData { let RequestData {
id, id,
@ -243,7 +236,7 @@ async fn router_handle_route(
return HandleResult::Ok; return HandleResult::Ok;
}; };
trace!("Request {:?}", request); trace!("Request {:?}", request);
let payload = serialize(&request, endpoint.supports_revision).unwrap(); let payload = serialize(&request, true).unwrap();
Message::Binary(payload) Message::Binary(payload)
}; };
@ -266,12 +259,8 @@ async fn router_handle_route(
HandleResult::Ok HandleResult::Ok
} }
async fn router_handle_response( async fn router_handle_response(response: Message, state: &mut RouterState) -> HandleResult {
response: Message, match Response::try_from(&response) {
state: &mut RouterState,
endpoint: &Endpoint,
) -> HandleResult {
match Response::try_from(&response, endpoint.supports_revision) {
Ok(option) => { Ok(option) => {
// We are only interested in responses that are not empty // We are only interested in responses that are not empty
if let Some(response) = option { if let Some(response) = option {
@ -352,9 +341,7 @@ async fn router_handle_response(
} }
.into_router_request(None) .into_router_request(None)
.unwrap(); .unwrap();
let value = let value = serialize(&request, true).unwrap();
serialize(&request, endpoint.supports_revision)
.unwrap();
Message::Binary(value) Message::Binary(value)
}; };
if let Err(error) = state.sink.send(kill).await { if let Err(error) = state.sink.send(kill).await {
@ -382,7 +369,7 @@ async fn router_handle_response(
if let Message::Binary(binary) = response { if let Message::Binary(binary) = response {
if let Ok(ErrorResponse { if let Ok(ErrorResponse {
id, id,
}) = deserialize(&binary, endpoint.supports_revision) }) = deserialize(&binary, true)
{ {
// Return an error if an ID was returned // Return an error if an ID was returned
if let Some(Ok(id)) = id.map(CoreValue::coerce_to_i64) { if let Some(Ok(id)) = id.map(CoreValue::coerce_to_i64) {
@ -421,7 +408,7 @@ async fn router_reconnect(
.into_router_request(None) .into_router_request(None)
.expect("replay commands should always convert to route requests"); .expect("replay commands should always convert to route requests");
let message = serialize(&request, endpoint.supports_revision).unwrap(); let message = serialize(&request, true).unwrap();
if let Err(error) = state.sink.send(Message::Binary(message)).await { if let Err(error) = state.sink.send(Message::Binary(message)).await {
trace!("{error}"); trace!("{error}");
@ -437,7 +424,7 @@ async fn router_reconnect(
.into_router_request(None) .into_router_request(None)
.unwrap(); .unwrap();
trace!("Request {:?}", request); trace!("Request {:?}", request);
let payload = serialize(&request, endpoint.supports_revision).unwrap(); let payload = serialize(&request, true).unwrap();
if let Err(error) = state.sink.send(Message::Binary(payload)).await { if let Err(error) = state.sink.send(Message::Binary(payload)).await {
trace!("{error}"); trace!("{error}");
time::sleep(time::Duration::from_secs(1)).await; time::sleep(time::Duration::from_secs(1)).await;
@ -465,7 +452,7 @@ pub(crate) async fn run_router(
) { ) {
let ping = { let ping = {
let request = Command::Health.into_router_request(None).unwrap(); let request = Command::Health.into_router_request(None).unwrap();
let value = serialize(&request, endpoint.supports_revision).unwrap(); let value = serialize(&request, true).unwrap();
Message::Binary(value) Message::Binary(value)
}; };
@ -503,7 +490,7 @@ pub(crate) async fn run_router(
break 'router; break 'router;
}; };
match router_handle_route(response, &mut state, &endpoint).await { match router_handle_route(response, &mut state).await {
HandleResult::Ok => {}, HandleResult::Ok => {},
HandleResult::Disconnected => { HandleResult::Disconnected => {
router_reconnect( router_reconnect(
@ -535,7 +522,7 @@ pub(crate) async fn run_router(
state.last_activity = Instant::now(); state.last_activity = Instant::now();
match result { match result {
Ok(message) => { Ok(message) => {
match router_handle_response(message, &mut state, &endpoint).await { match router_handle_response(message, &mut state).await {
HandleResult::Ok => continue, HandleResult::Ok => continue,
HandleResult::Disconnected => { HandleResult::Disconnected => {
router_reconnect( router_reconnect(
@ -593,21 +580,19 @@ pub(crate) async fn run_router(
} }
impl Response { impl Response {
fn try_from(message: &Message, supports_revision: bool) -> Result<Option<Self>> { fn try_from(message: &Message) -> Result<Option<Self>> {
match message { match message {
Message::Text(text) => { Message::Text(text) => {
trace!("Received an unexpected text message; {text}"); trace!("Received an unexpected text message; {text}");
Ok(None) Ok(None)
} }
Message::Binary(binary) => { Message::Binary(binary) => deserialize(binary, true).map(Some).map_err(|error| {
deserialize(binary, supports_revision).map(Some).map_err(|error| { Error::ResponseFromBinary {
Error::ResponseFromBinary { binary: binary.clone(),
binary: binary.clone(), error: bincode::ErrorKind::Custom(error.to_string()).into(),
error: bincode::ErrorKind::Custom(error.to_string()).into(), }
} .into()
.into() }),
})
}
Message::Ping(..) => { Message::Ping(..) => {
trace!("Received a ping from the server"); trace!("Received a ping from the server");
Ok(None) Ok(None)

View file

@ -183,7 +183,7 @@ async fn router_handle_request(
return HandleResult::Ok; return HandleResult::Ok;
}; };
trace!("Request {:?}", req); trace!("Request {:?}", req);
let payload = serialize(&req, endpoint.supports_revision).unwrap(); let payload = serialize(&req, true).unwrap();
Message::Binary(payload) Message::Binary(payload)
}; };
@ -211,7 +211,7 @@ async fn router_handle_response(
state: &mut RouterState, state: &mut RouterState,
endpoint: &Endpoint, endpoint: &Endpoint,
) -> HandleResult { ) -> HandleResult {
match Response::try_from(&response, endpoint.supports_revision) { match Response::try_from(&response) {
Ok(option) => { Ok(option) => {
// We are only interested in responses that are not empty // We are only interested in responses that are not empty
if let Some(response) = option { if let Some(response) = option {
@ -290,8 +290,7 @@ async fn router_handle_response(
uuid: live_query_id.0, uuid: live_query_id.0,
} }
.into_router_request(None); .into_router_request(None);
let value = serialize(&request, endpoint.supports_revision) let value = serialize(&request, true).unwrap();
.unwrap();
Message::Binary(value) Message::Binary(value)
}; };
if let Err(error) = state.sink.send(kill).await { if let Err(error) = state.sink.send(kill).await {
@ -320,7 +319,7 @@ async fn router_handle_response(
if let Message::Binary(binary) = response { if let Message::Binary(binary) = response {
if let Ok(Response { if let Ok(Response {
id, id,
}) = deserialize(&mut &binary[..], endpoint.supports_revision) }) = deserialize(&mut &binary[..], true)
{ {
// Return an error if an ID was returned // Return an error if an ID was returned
if let Some(Ok(id)) = id.map(CoreValue::coerce_to_i64) { if let Some(Ok(id)) = id.map(CoreValue::coerce_to_i64) {
@ -348,10 +347,7 @@ async fn router_reconnect(
) { ) {
loop { loop {
trace!("Reconnecting..."); trace!("Reconnecting...");
let connect = match endpoint.supports_revision { let connect = WsMeta::connect(&endpoint.url, vec![super::REVISION_HEADER]).await;
true => WsMeta::connect(&endpoint.url, vec![super::REVISION_HEADER]).await,
false => WsMeta::connect(&endpoint.url, None).await,
};
match connect { match connect {
Ok((mut meta, stream)) => { Ok((mut meta, stream)) => {
let (new_sink, new_stream) = stream.split(); let (new_sink, new_stream) = stream.split();
@ -373,7 +369,7 @@ async fn router_reconnect(
}; };
for (_, message) in &state.replay { for (_, message) in &state.replay {
let message = message.clone().into_router_request(None); let message = message.clone().into_router_request(None);
let message = serialize(&message, endpoint.supports_revision).unwrap(); let message = serialize(&message, true).unwrap();
if let Err(error) = state.sink.send(Message::Binary(message)).await { if let Err(error) = state.sink.send(Message::Binary(message)).await {
trace!("{error}"); trace!("{error}");
@ -412,10 +408,7 @@ pub(crate) async fn run_router(
conn_tx: Sender<Result<()>>, conn_tx: Sender<Result<()>>,
route_rx: Receiver<Route>, route_rx: Receiver<Route>,
) { ) {
let connect = match endpoint.supports_revision { let connect = WsMeta::connect(&endpoint.url, vec![super::REVISION_HEADER]).await;
true => WsMeta::connect(&endpoint.url, vec![super::REVISION_HEADER]).await,
false => WsMeta::connect(&endpoint.url, None).await,
};
let (mut ws, socket) = match connect { let (mut ws, socket) = match connect {
Ok(pair) => pair, Ok(pair) => pair,
Err(error) => { Err(error) => {
@ -444,7 +437,7 @@ pub(crate) async fn run_router(
let mut request = BTreeMap::new(); let mut request = BTreeMap::new();
request.insert("method".to_owned(), "ping".into()); request.insert("method".to_owned(), "ping".into());
let value = CoreValue::from(request); let value = CoreValue::from(request);
let value = serialize(&value, endpoint.supports_revision).unwrap(); let value = serialize(&value, true).unwrap();
Message::Binary(value) Message::Binary(value)
}; };
@ -536,14 +529,14 @@ pub(crate) async fn run_router(
} }
impl Response { impl Response {
fn try_from(message: &Message, supports_revision: bool) -> Result<Option<Self>> { fn try_from(message: &Message) -> Result<Option<Self>> {
match message { match message {
Message::Text(text) => { Message::Text(text) => {
trace!("Received an unexpected text message; {text}"); trace!("Received an unexpected text message; {text}");
Ok(None) Ok(None)
} }
Message::Binary(binary) => { Message::Binary(binary) => {
deserialize(&mut &binary[..], supports_revision).map(Some).map_err(|error| { deserialize(&mut &binary[..], true).map(Some).map_err(|error| {
Error::ResponseFromBinary { Error::ResponseFromBinary {
binary: binary.clone(), binary: binary.clone(),
error: bincode::ErrorKind::Custom(error.to_string()).into(), error: bincode::ErrorKind::Custom(error.to_string()).into(),

View file

@ -142,8 +142,7 @@ pub type Result<T> = std::result::Result<T, crate::Error>;
// Channel for waiters // Channel for waiters
type Waiter = (watch::Sender<Option<WaitFor>>, watch::Receiver<Option<WaitFor>>); type Waiter = (watch::Sender<Option<WaitFor>>, watch::Receiver<Option<WaitFor>>);
const SUPPORTED_VERSIONS: (&str, &str) = (">=1.0.0, <3.0.0", "20230701.55918b7c"); const SUPPORTED_VERSIONS: (&str, &str) = (">=1.2.0, <3.0.0", "20230701.55918b7c");
const REVISION_SUPPORTED_SERVER_VERSION: Version = Version::new(1, 2, 0);
/// Connection trait implemented by supported engines /// Connection trait implemented by supported engines
pub trait Connection: conn::Connection {} pub trait Connection: conn::Connection {}
@ -205,18 +204,18 @@ where
fn into_future(self) -> Self::IntoFuture { fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { Box::pin(async move {
let mut endpoint = self.address?; let endpoint = self.address?;
let endpoint_kind = EndpointKind::from(endpoint.url.scheme()); let endpoint_kind = EndpointKind::from(endpoint.url.scheme());
let mut client = Client::connect(endpoint.clone(), self.capacity).await?; let client = Client::connect(endpoint, self.capacity).await?;
if endpoint_kind.is_remote() { if endpoint_kind.is_remote() {
let mut version = client.version().await?; match client.version().await {
// we would like to be able to connect to pre-releases too Ok(mut version) => {
version.pre = Default::default(); // we would like to be able to connect to pre-releases too
client.check_server_version(&version).await?; version.pre = Default::default();
if version >= REVISION_SUPPORTED_SERVER_VERSION && endpoint_kind.is_ws() { client.check_server_version(&version).await?;
// Switch to revision based serialisation }
endpoint.supports_revision = true; // TODO(raphaeldarley) don't error if Method Not allowed
client = Client::connect(endpoint, self.capacity).await?; Err(e) => return Err(e),
} }
} }
// Both ends of the channel are still alive at this point // Both ends of the channel are still alive at this point
@ -239,18 +238,18 @@ where
if self.router.get().is_some() { if self.router.get().is_some() {
return Err(Error::AlreadyConnected.into()); return Err(Error::AlreadyConnected.into());
} }
let mut endpoint = self.address?; let endpoint = self.address?;
let endpoint_kind = EndpointKind::from(endpoint.url.scheme()); let endpoint_kind = EndpointKind::from(endpoint.url.scheme());
let mut client = Client::connect(endpoint.clone(), self.capacity).await?; let client = Client::connect(endpoint, self.capacity).await?;
if endpoint_kind.is_remote() { if endpoint_kind.is_remote() {
let mut version = client.version().await?; match client.version().await {
// we would like to be able to connect to pre-releases too Ok(mut version) => {
version.pre = Default::default(); // we would like to be able to connect to pre-releases too
client.check_server_version(&version).await?; version.pre = Default::default();
if version >= REVISION_SUPPORTED_SERVER_VERSION && endpoint_kind.is_ws() { client.check_server_version(&version).await?;
// Switch to revision based serialisation }
endpoint.supports_revision = true; // TODO(raphaeldarley) don't error if Method Not allowed
client = Client::connect(endpoint, self.capacity).await?; Err(e) => return Err(e),
} }
} }
let cell = let cell =

View file

@ -34,8 +34,6 @@ pub struct Endpoint {
#[doc(hidden)] #[doc(hidden)]
pub path: String, pub path: String,
pub(crate) config: Config, pub(crate) config: Config,
// Whether or not the remote server supports revision based serialisation
pub(crate) supports_revision: bool,
} }
impl Endpoint { impl Endpoint {
@ -44,7 +42,6 @@ impl Endpoint {
url, url,
path: String::new(), path: String::new(),
config: Default::default(), config: Default::default(),
supports_revision: false,
} }
} }
@ -166,10 +163,6 @@ impl EndpointKind {
) )
} }
pub(crate) fn is_ws(&self) -> bool {
matches!(self, EndpointKind::Ws | EndpointKind::Wss)
}
pub fn is_local(&self) -> bool { pub fn is_local(&self) -> bool {
!self.is_remote() !self.is_remote()
} }