Move JWKS cache storage to memory (#3649)
This commit is contained in:
parent
b62011bfec
commit
21975548f2
3 changed files with 57 additions and 43 deletions
|
@ -8,7 +8,17 @@ use once_cell::sync::Lazy;
|
||||||
use reqwest::{Client, Url};
|
use reqwest::{Client, Url};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
pub(crate) type JwksCache = HashMap<String, JwksCacheEntry>;
|
||||||
|
#[derive(Clone, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct JwksCacheEntry {
|
||||||
|
jwks: JwkSet,
|
||||||
|
time: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
static CACHE_EXPIRATION: Lazy<chrono::Duration> = Lazy::new(|| Duration::seconds(1));
|
static CACHE_EXPIRATION: Lazy<chrono::Duration> = Lazy::new(|| Duration::seconds(1));
|
||||||
|
@ -66,19 +76,21 @@ pub(super) async fn config(
|
||||||
kid: &str,
|
kid: &str,
|
||||||
url: &str,
|
url: &str,
|
||||||
) -> Result<(DecodingKey, Validation), Error> {
|
) -> Result<(DecodingKey, Validation), Error> {
|
||||||
|
// Retrieve JWKS cache
|
||||||
|
let cache = kvs.jwks_cache();
|
||||||
// Attempt to fetch relevant JWK object either from local cache or remote location
|
// Attempt to fetch relevant JWK object either from local cache or remote location
|
||||||
let jwk = match fetch_jwks_from_cache(url).await {
|
let jwk = match fetch_jwks_from_cache(cache, url).await {
|
||||||
Ok(jwks_cache) => {
|
Some(jwks) => {
|
||||||
trace!("Successfully fetched JWKS object from local cache");
|
trace!("Successfully fetched JWKS object from local cache");
|
||||||
// Check that the cached JWKS object has not expired yet
|
// Check that the cached JWKS object has not expired yet
|
||||||
if Utc::now().signed_duration_since(jwks_cache.time) < *CACHE_EXPIRATION {
|
if Utc::now().signed_duration_since(jwks.time) < *CACHE_EXPIRATION {
|
||||||
// Attempt to find JWK in JWKS object from local cache
|
// Attempt to find JWK in JWKS object from local cache
|
||||||
match jwks_cache.jwks.find(kid) {
|
match jwks.jwks.find(kid) {
|
||||||
Some(jwk) => jwk.to_owned(),
|
Some(jwk) => jwk.to_owned(),
|
||||||
_ => {
|
_ => {
|
||||||
trace!("Could not find valid JWK object with key identifier '{kid}' in cached JWKS object");
|
trace!("Could not find valid JWK object with key identifier '{kid}' in cached JWKS object");
|
||||||
// Check that the cached JWKS object has not been recently updated
|
// Check that the cached JWKS object has not been recently updated
|
||||||
if Utc::now().signed_duration_since(jwks_cache.time) < *CACHE_COOLDOWN {
|
if Utc::now().signed_duration_since(jwks.time) < *CACHE_COOLDOWN {
|
||||||
debug!("Refused to refresh cache before cooldown period is over");
|
debug!("Refused to refresh cache before cooldown period is over");
|
||||||
return Err(Error::InvalidAuth); // Return opaque error
|
return Err(Error::InvalidAuth); // Return opaque error
|
||||||
}
|
}
|
||||||
|
@ -90,7 +102,7 @@ pub(super) async fn config(
|
||||||
find_jwk_from_url(kvs, url, kid).await?
|
find_jwk_from_url(kvs, url, kid).await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
None => {
|
||||||
trace!("Could not fetch JWKS object from local cache");
|
trace!("Could not fetch JWKS object from local cache");
|
||||||
find_jwk_from_url(kvs, url, kid).await?
|
find_jwk_from_url(kvs, url, kid).await?
|
||||||
}
|
}
|
||||||
|
@ -145,8 +157,10 @@ async fn find_jwk_from_url(kvs: &Datastore, url: &str, kid: &str) -> Result<Jwk,
|
||||||
return Err(Error::InvalidAuth); // Return opaque error
|
return Err(Error::InvalidAuth); // Return opaque error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retrieve JWKS cache
|
||||||
|
let cache = kvs.jwks_cache();
|
||||||
// Attempt to fetch JWKS object from remote location
|
// Attempt to fetch JWKS object from remote location
|
||||||
match fetch_jwks_from_url(url).await {
|
match fetch_jwks_from_url(cache, url).await {
|
||||||
Ok(jwks) => {
|
Ok(jwks) => {
|
||||||
trace!("Successfully fetched JWKS object from remote location");
|
trace!("Successfully fetched JWKS object from remote location");
|
||||||
// Attempt to find JWK in JWKS by the key identifier
|
// Attempt to find JWK in JWKS by the key identifier
|
||||||
|
@ -199,7 +213,7 @@ fn check_capabilities_url(kvs: &Datastore, url: &str) -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempts to fetch a JWKS object from a remote location and stores it in the cache if successful
|
// Attempts to fetch a JWKS object from a remote location and stores it in the cache if successful
|
||||||
async fn fetch_jwks_from_url(url: &str) -> Result<JwkSet, Error> {
|
async fn fetch_jwks_from_url(cache: &Arc<RwLock<JwksCache>>, url: &str) -> Result<JwkSet, Error> {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
let res = client.get(url).timeout((*REMOTE_TIMEOUT).to_std().unwrap()).send().await?;
|
let res = client.get(url).timeout((*REMOTE_TIMEOUT).to_std().unwrap()).send().await?;
|
||||||
|
@ -214,11 +228,9 @@ async fn fetch_jwks_from_url(url: &str) -> Result<JwkSet, Error> {
|
||||||
match serde_json::from_slice::<JwkSet>(&jwks) {
|
match serde_json::from_slice::<JwkSet>(&jwks) {
|
||||||
Ok(jwks) => {
|
Ok(jwks) => {
|
||||||
// If successful, cache the JWKS object by its URL
|
// If successful, cache the JWKS object by its URL
|
||||||
match store_jwks_in_cache(jwks.clone(), url).await {
|
match store_jwks_in_cache(cache, jwks.clone(), url).await {
|
||||||
Ok(_) => trace!("Successfully stored JWKS object in local cache"),
|
None => trace!("Successfully added JWKS object to local cache"),
|
||||||
Err(err) => {
|
Some(_) => trace!("Successfully updated JWKS object in local cache"),
|
||||||
warn!("Failed to store JWKS object in local cache: '{}'", err);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(jwks)
|
Ok(jwks)
|
||||||
|
@ -230,50 +242,40 @@ async fn fetch_jwks_from_url(url: &str) -> Result<JwkSet, Error> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct JwksCache {
|
|
||||||
jwks: JwkSet,
|
|
||||||
time: DateTime<Utc>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempts to fetch a JWKS object from the local cache
|
// Attempts to fetch a JWKS object from the local cache
|
||||||
async fn fetch_jwks_from_cache(url: &str) -> Result<JwksCache, Error> {
|
async fn fetch_jwks_from_cache(
|
||||||
let path = cache_path_from_url(url);
|
cache: &Arc<RwLock<JwksCache>>,
|
||||||
let bytes = crate::obs::get(&path).await?;
|
url: &str,
|
||||||
|
) -> Option<JwksCacheEntry> {
|
||||||
|
let path = cache_key_from_url(url);
|
||||||
|
let cache = cache.read().await;
|
||||||
|
|
||||||
match serde_json::from_slice::<JwksCache>(&bytes) {
|
cache.get(&path).cloned()
|
||||||
Ok(jwks_cache) => Ok(jwks_cache),
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Failed to parse malformed JWKS object: '{}'", err);
|
|
||||||
Err(Error::InvalidAuth) // Return opaque error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempts to store a JWKS object in the local cache
|
// Attempts to store a JWKS object in the local cache
|
||||||
async fn store_jwks_in_cache(jwks: JwkSet, url: &str) -> Result<(), Error> {
|
async fn store_jwks_in_cache(
|
||||||
let jwks_cache = JwksCache {
|
cache: &Arc<RwLock<JwksCache>>,
|
||||||
|
jwks: JwkSet,
|
||||||
|
url: &str,
|
||||||
|
) -> Option<JwksCacheEntry> {
|
||||||
|
let entry = JwksCacheEntry {
|
||||||
jwks,
|
jwks,
|
||||||
time: Utc::now(),
|
time: Utc::now(),
|
||||||
};
|
};
|
||||||
let path = cache_path_from_url(url);
|
let path = cache_key_from_url(url);
|
||||||
|
let mut cache = cache.write().await;
|
||||||
|
|
||||||
match serde_json::to_vec(&jwks_cache) {
|
cache.insert(path, entry)
|
||||||
Ok(data) => crate::obs::put(&path, data).await,
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Failed to cache malformed JWKS object: '{}'", err);
|
|
||||||
Err(Error::InvalidAuth) // Return opaque error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generates a unique cache path for a given URL string
|
// Generates a unique cache key for a given URL string
|
||||||
fn cache_path_from_url(url: &str) -> String {
|
fn cache_key_from_url(url: &str) -> String {
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(url);
|
hasher.update(url);
|
||||||
let result = hasher.finalize();
|
let result = hasher.finalize();
|
||||||
|
|
||||||
format!("jwks/{:x}.json", result)
|
format!("{:x}", result)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -26,6 +26,8 @@ use crate::dbs::{
|
||||||
use crate::doc::Document;
|
use crate::doc::Document;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use crate::fflags::FFLAGS;
|
use crate::fflags::FFLAGS;
|
||||||
|
#[cfg(feature = "jwks")]
|
||||||
|
use crate::iam::jwks::JwksCache;
|
||||||
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
|
use crate::iam::{Action, Auth, Error as IamError, Resource, Role};
|
||||||
use crate::idx::trees::store::IndexStores;
|
use crate::idx::trees::store::IndexStores;
|
||||||
use crate::key::root::hb::Hb;
|
use crate::key::root::hb::Hb;
|
||||||
|
@ -87,6 +89,9 @@ pub struct Datastore {
|
||||||
clock: Arc<SizedClock>,
|
clock: Arc<SizedClock>,
|
||||||
// The index store cache
|
// The index store cache
|
||||||
index_stores: IndexStores,
|
index_stores: IndexStores,
|
||||||
|
#[cfg(feature = "jwks")]
|
||||||
|
// The JWKS object cache
|
||||||
|
jwks_cache: Arc<RwLock<JwksCache>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// We always want to be circulating the live query information
|
/// We always want to be circulating the live query information
|
||||||
|
@ -359,6 +364,8 @@ impl Datastore {
|
||||||
index_stores: IndexStores::default(),
|
index_stores: IndexStores::default(),
|
||||||
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
|
local_live_queries: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
|
cf_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
|
#[cfg(feature = "jwks")]
|
||||||
|
jwks_cache: Arc::new(RwLock::new(JwksCache::new())),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,6 +445,11 @@ impl Datastore {
|
||||||
self.capabilities.allows_network_target(net_target)
|
self.capabilities.allows_network_target(net_target)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "jwks")]
|
||||||
|
pub(crate) fn jwks_cache(&self) -> &Arc<RwLock<JwksCache>> {
|
||||||
|
&self.jwks_cache
|
||||||
|
}
|
||||||
|
|
||||||
/// Setup the initial credentials
|
/// Setup the initial credentials
|
||||||
/// Trigger the `unreachable definition` compilation error, probably due to this issue:
|
/// Trigger the `unreachable definition` compilation error, probably due to this issue:
|
||||||
/// https://github.com/rust-lang/rust/issues/111370
|
/// https://github.com/rust-lang/rust/issues/111370
|
||||||
|
|
|
@ -34,7 +34,7 @@ pub mod idx;
|
||||||
pub mod key;
|
pub mod key;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub mod kvs;
|
pub mod kvs;
|
||||||
#[cfg(any(feature = "ml", feature = "ml2", feature = "jwks"))]
|
#[cfg(any(feature = "ml", feature = "ml2"))]
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub mod obs;
|
pub mod obs;
|
||||||
pub mod options;
|
pub mod options;
|
||||||
|
|
Loading…
Reference in a new issue