Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,26 @@ use crate::config::Config;
use crate::config::{self, CacheType, PreprocessorCacheModeConfig};
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use crate::errors::*;

/// Result of [`Storage::get_path`].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GetPathResult {
/// Cache hit: the entry lives at this filesystem path.
Found(PathBuf),
/// Cache miss: the key is not in the cache.
Miss,
/// This backend does not support direct file access; use `get`/`get_raw` instead.
Unsupported,
}

/// An interface to cache storage.
#[async_trait]
pub trait Storage: Send + Sync {
Expand Down Expand Up @@ -134,6 +147,12 @@ pub trait Storage: Send + Sync {
fn basedirs(&self) -> &[Vec<u8>] {
&[]
}
/// Return the filesystem path of the cached entry for `key`.
/// Default impl returns [`GetPathResult::Unsupported`].
async fn get_path(&self, _key: &str) -> GetPathResult {
GetPathResult::Unsupported
}

/// Return the preprocessor cache entry for a given preprocessor key,
/// if it exists.
/// Only applicable when using preprocessor cache mode.
Expand Down
3 changes: 2 additions & 1 deletion src/cache/cache_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use super::utils::{get_file_mode, set_file_mode};
use crate::errors::*;
use fs_err as fs;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io::{Cursor, Read, Seek, Write};
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -56,7 +57,7 @@ impl fmt::Debug for Cache {
}

/// CacheMode is used to represent which mode we are using.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CacheMode {
/// Only read cache from storage.
ReadOnly,
Expand Down
22 changes: 21 additions & 1 deletion src/cache/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::cache::{Cache, CacheMode, CacheRead, CacheWrite, Storage};
use crate::cache::{Cache, CacheMode, CacheRead, CacheWrite, GetPathResult, Storage};
use crate::compiler::PreprocessorCacheEntry;
use crate::lru_disk_cache::{Error as LruError, ReadSeek};
use async_trait::async_trait;
Expand Down Expand Up @@ -132,6 +132,26 @@ impl Storage for DiskCache {
.await?
}

async fn get_path(&self, key: &str) -> GetPathResult {
let rel_path = make_key_path(key);
let lru = self.lru.clone();
self.pool
.spawn_blocking(move || {
match lru
.lock()
.unwrap()
.get_or_init()
.ok()
.and_then(|c| c.get_abs_path(&rel_path))
{
Some(p) => GetPathResult::Found(p),
None => GetPathResult::Miss,
}
})
.await
.unwrap_or(GetPathResult::Miss)
}

async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
trace!("DiskCache::put({})", key);
// Delegate to put_raw after serializing the entry
Expand Down
213 changes: 213 additions & 0 deletions src/cache/ipc_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::cache::CacheMode;
use crate::cache::GetPathResult;
use crate::cache::cache::Storage;
use crate::cache::cache_io::{Cache, CacheRead, CacheWrite};
use crate::client::ServerConnection;
use crate::compiler::PreprocessorCacheEntry;
use crate::config::PreprocessorCacheModeConfig;
use crate::errors::*;
use crate::protocol::{Request, Response, StorageHandshakeInfo};
use async_trait::async_trait;
use bytes::Bytes;
use std::io::Cursor;
use std::sync::{Arc, Mutex};
use std::time::Duration;

/// `Storage` implementation that forwards all cache operations to the sccache
/// daemon over the existing IPC connection. Used by CLI processes in
/// client-side mode.
///
/// `ServerConnection` is synchronous and non-`Clone`, so it lives behind a
/// `Mutex` and every RPC dispatches via `tokio::task::spawn_blocking`. The
/// lock is held only for the duration of a single blocking call, never across
/// an `.await` point.
pub struct IpcStorage {
conn: Arc<Mutex<ServerConnection>>,
handshake: StorageHandshakeInfo,
}

impl IpcStorage {
/// Connect to the daemon and perform the `StorageHandshake` RPC.
/// Returns an `IpcStorage` that can be used as an `Arc<dyn Storage>`.
pub fn connect(mut conn: ServerConnection) -> Result<Self> {
let resp = conn.request(Request::StorageHandshake)?;
let handshake = match resp {
Response::StorageHandshake(info) => info,
other => bail!("IpcStorage: unexpected handshake response: {other:?}"),
};
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
handshake,
})
}

/// Return a clone of the underlying connection handle so callers can send
/// additional RPCs (e.g., `RecordStats`) after the storage is no longer
/// needed.
pub fn conn(&self) -> Arc<Mutex<ServerConnection>> {
Arc::clone(&self.conn)
}

async fn rpc(&self, req: Request) -> Result<Response> {
let conn = Arc::clone(&self.conn);
tokio::task::spawn_blocking(move || conn.lock().unwrap().request(req))
.await
.context("spawn_blocking panicked")?
}
}

#[async_trait]
impl Storage for IpcStorage {
async fn get(&self, key: &str) -> Result<Cache> {
match self.get_path(key).await {
GetPathResult::Found(path) => {
let file = std::fs::File::open(&path)
.with_context(|| format!("IpcStorage::get: open {}", path.display()))?;
match CacheRead::from(file) {
Ok(entry) => Ok(Cache::Hit(entry)),
Err(_) => Ok(Cache::Miss),
}
}
GetPathResult::Miss => Ok(Cache::Miss),
// Backend doesn't support paths (S3, Redis, …); fall back to bytes over IPC.
GetPathResult::Unsupported => match self.get_raw(key).await? {
Some(bytes) => match CacheRead::from(Cursor::new(bytes)) {
Ok(entry) => Ok(Cache::Hit(entry)),
Err(_) => Ok(Cache::Miss),
},
None => Ok(Cache::Miss),
},
}
}

async fn get_path(&self, key: &str) -> GetPathResult {
match self
.rpc(Request::StorageGetPath {
key: key.to_owned(),
})
.await
{
Ok(Response::StorageGetPath(result)) => result,
_ => GetPathResult::Unsupported,
}
}

async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
self.put_raw(key, entry.finish()?.into()).await
}

async fn get_raw(&self, key: &str) -> Result<Option<Bytes>> {
let resp = self
.rpc(Request::StorageGetRaw {
key: key.to_owned(),
})
.await?;
match resp {
Response::StorageGetRaw(opt) => Ok(opt.map(Bytes::from)),
other => bail!("IpcStorage::get_raw: unexpected response: {other:?}"),
}
}

async fn put_raw(&self, key: &str, data: Bytes) -> Result<Duration> {
let resp = self
.rpc(Request::StoragePutRaw {
key: key.to_owned(),
data: data.to_vec(),
})
.await?;
match resp {
Response::StoragePutRaw(Ok(())) => Ok(Duration::ZERO),
Response::StoragePutRaw(Err(e)) => bail!("IpcStorage::put_raw: daemon error: {e}"),
other => bail!("IpcStorage::put_raw: unexpected response: {other:?}"),
}
}

async fn check(&self) -> Result<CacheMode> {
Ok(self.handshake.cache_mode)
}

fn location(&self) -> String {
self.handshake.location.clone()
}

fn cache_type_name(&self) -> &'static str {
"ipc"
}

async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}

async fn max_size(&self) -> Result<Option<u64>> {
Ok(self.handshake.max_size)
}

fn preprocessor_cache_mode_config(&self) -> PreprocessorCacheModeConfig {
self.handshake.preprocessor_cache_mode_config
}

fn basedirs(&self) -> &[Vec<u8>] {
&self.handshake.basedirs
}

async fn get_preprocessor_cache_entry(
&self,
key: &str,
) -> Result<Option<Box<dyn crate::lru_disk_cache::ReadSeek>>> {
let resp = self
.rpc(Request::StorageGetPreprocessorEntry {
key: key.to_owned(),
})
.await?;
match resp {
Response::StorageGetPreprocessorEntry(Ok(None)) => Ok(None),
Response::StorageGetPreprocessorEntry(Ok(Some(bytes))) => Ok(Some(
Box::new(Cursor::new(bytes)) as Box<dyn crate::lru_disk_cache::ReadSeek>,
)),
Response::StorageGetPreprocessorEntry(Err(e)) => {
bail!("IpcStorage::get_preprocessor_cache_entry: {e}")
}
other => {
bail!("IpcStorage::get_preprocessor_cache_entry: unexpected response: {other:?}")
}
}
}

async fn put_preprocessor_cache_entry(
&self,
key: &str,
entry: PreprocessorCacheEntry,
) -> Result<()> {
let mut buf = vec![];
entry
.serialize_to(&mut buf)
.map_err(|e| anyhow::anyhow!("{e}"))?;
let resp = self
.rpc(Request::StoragePutPreprocessorEntry {
key: key.to_owned(),
entry_bytes: buf,
})
.await?;
match resp {
Response::StoragePutPreprocessorEntry(Ok(())) => Ok(()),
Response::StoragePutPreprocessorEntry(Err(e)) => {
bail!("IpcStorage::put_preprocessor_cache_entry: {e}")
}
other => {
bail!("IpcStorage::put_preprocessor_cache_entry: unexpected response: {other:?}")
}
}
}
}
2 changes: 2 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod disk;
pub mod gcs;
#[cfg(feature = "gha")]
pub mod gha;
pub mod ipc_storage;
#[allow(clippy::module_inception)]
pub mod lazy_disk_cache;
#[cfg(feature = "memcached")]
Expand Down Expand Up @@ -53,5 +54,6 @@ pub(crate) mod http_client;

pub use crate::cache::cache::*;
pub use crate::cache::cache_io::*;
pub use crate::cache::ipc_storage::IpcStorage;
pub use crate::cache::lazy_disk_cache::*;
pub use crate::cache::multilevel::MultiLevelStorage;
43 changes: 41 additions & 2 deletions src/cache/multilevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,33 @@ impl<'de> Deserialize<'de> for MultiLevelStats {
}
}

impl std::ops::AddAssign for LevelStats {
fn add_assign(&mut self, rhs: Self) {
// name and location identify the level — keep lhs values
self.hits += rhs.hits;
self.misses += rhs.misses;
self.writes += rhs.writes;
self.write_failures += rhs.write_failures;
self.backfills_from += rhs.backfills_from;
self.backfills_to += rhs.backfills_to;
self.hit_duration += rhs.hit_duration;
self.write_duration += rhs.write_duration;
}
}

impl std::ops::AddAssign for MultiLevelStats {
fn add_assign(&mut self, rhs: Self) {
let mut rhs_iter = rhs.0.into_iter();
for lhs_level in &mut self.0 {
if let Some(rhs_level) = rhs_iter.next() {
*lhs_level += rhs_level;
}
}
// Append any extra levels present only in rhs
self.0.extend(rhs_iter);
}
}

impl LevelStats {
/// Calculate hit rate as a percentage
pub fn hit_rate(&self) -> f64 {
Expand Down Expand Up @@ -715,13 +742,25 @@ impl Storage for MultiLevelStorage {
Ok(Cache::Miss)
}

async fn get_raw(&self, key: &str) -> Result<Option<Bytes>> {
for level in &self.levels {
if let Some(bytes) = level.get_raw(key).await? {
return Ok(Some(bytes));
}
}
Ok(None)
}

async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let data: Bytes = entry.finish()?.into();
self.put_raw(key, data).await
}

async fn put_raw(&self, key: &str, data: Bytes) -> Result<Duration> {
if self.levels.is_empty() {
return Err(anyhow!("No cache levels configured"));
}

// Serialize cache entry once
let data: Bytes = entry.finish()?.into();
let key_str = key.to_string();

match self.write_error_policy {
Expand Down
Loading
Loading