Skip to content

Turbopack: cache directory creation #78729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions turbopack/crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use mime::Mime;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
pub use read_glob::ReadGlobResult;
use read_glob::{read_glob, track_glob};
use rustc_hash::FxHashSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{
Expand All @@ -59,8 +60,8 @@ use tracing::Instrument;
use turbo_rcstr::RcStr;
use turbo_tasks::{
debug::ValueDebugFormat, effect, mark_session_dependent, mark_stateful, trace::TraceRawVcs,
Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, ValueToString,
Vc,
ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef,
ResolvedVc, ValueToString, Vc,
};
use turbo_tasks_hash::{
hash_xxh3_hash128, hash_xxh3_hash64, DeterministicHash, DeterministicHasher,
Expand Down Expand Up @@ -208,6 +209,12 @@ pub trait FileSystem: ValueToString {
fn metadata(self: Vc<Self>, fs_path: Vc<FileSystemPath>) -> Vc<FileMeta>;
}

#[derive(Default)]
struct DiskFileSystemApplyContext {
/// A cache of already created directories to avoid creating them multiple times.
created_directories: FxHashSet<PathBuf>,
}

#[derive(Serialize, Deserialize, TraceRawVcs, ValueDebugFormat, NonLocalValue)]
struct DiskFileSystemInner {
pub name: RcStr,
Expand Down Expand Up @@ -393,6 +400,29 @@ impl DiskFileSystemInner {

Ok(())
}

async fn create_directory(self: &Arc<Self>, directory: &Path) -> Result<()> {
let already_created = ApplyEffectsContext::with_or_insert_with(
DiskFileSystemApplyContext::default,
|fs_context| fs_context.created_directories.contains(directory),
);
if !already_created {
let func = |p: &Path| std::fs::create_dir_all(p);
retry_blocking(directory, func)
.concurrency_limited(&self.semaphore)
.instrument(tracing::info_span!(
"create directory",
path = display(directory.display())
))
.await?;
Comment on lines +411 to +417
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're just copying this, but we really shouldn't retry filesystem operations. They shouldn't fail transiently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean that?

At least on windows fs operations sometimes fail with permission errors while the antivir is scanning the file. That will automatically resolve by retrying.
We also do that in webpack using graceful-fs

Copy link
Member

@bgw bgw May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least on windows fs operations sometimes fail with permission errors while the antivir is scanning the file. That will automatically resolve by retrying.

WTF, that's insane. I've never seen a unix tool retry filesystem operations.

Is this maybe just for deletions that this can happen? I know Windows has a restriction that you can't unlink an open file. If that's the case, we should only retry deletions.

EDIT: fsspec/filesystem_spec#776 I guess it can happen with writes too 😞 . Maybe we should only do this on Windows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v0 thinks retries makes sense on linux too

https://v0.dev/chat/KFWvUZ7IY9T

ApplyEffectsContext::with(|fs_context: &mut DiskFileSystemApplyContext| {
fs_context
.created_directories
.insert(directory.to_path_buf())
});
}
Ok(())
}
}

#[turbo_tasks::value(cell = "new", eq = "manual")]
Expand Down Expand Up @@ -735,20 +765,13 @@ impl FileSystem for DiskFileSystem {
let create_directory = compare == FileComparison::Create;
if create_directory {
if let Some(parent) = full_path.parent() {
retry_blocking(parent, |p| std::fs::create_dir_all(p))
.concurrency_limited(&inner.semaphore)
.instrument(tracing::info_span!(
"create directory",
path = display(parent.display())
))
.await
.with_context(|| {
format!(
"failed to create directory {} for write to {}",
parent.display(),
full_path.display()
)
})?;
inner.create_directory(parent).await.with_context(|| {
format!(
"failed to create directory {} for write to {}",
parent.display(),
full_path.display()
)
})?;
}
}
let full_path_to_write = full_path.clone();
Expand Down Expand Up @@ -872,20 +895,13 @@ impl FileSystem for DiskFileSystem {
let create_directory = old_content.is_none();
if create_directory {
if let Some(parent) = full_path.parent() {
retry_blocking(parent, |path| std::fs::create_dir_all(path))
.concurrency_limited(&inner.semaphore)
.instrument(tracing::info_span!(
"create directory",
path = display(parent.display())
))
.await
.with_context(|| {
format!(
"failed to create directory {} for write to {}",
parent.display(),
full_path.display()
)
})?;
inner.create_directory(parent).await.with_context(|| {
format!(
"failed to create directory {} for write link to {}",
parent.display(),
full_path.display()
)
})?;
}
}

Expand Down
131 changes: 103 additions & 28 deletions turbopack/crates/turbo-tasks/src/effect.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
use std::{
any::{Any, TypeId},
borrow::Cow,
future::Future,
mem::replace,
panic,
pin::Pin,
sync::Arc,
};

use anyhow::{anyhow, Result};
use auto_hash_map::AutoSet;
use futures::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
use rustc_hash::FxHashSet;
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::task_local;
use tracing::{Instrument, Span};

use crate::{
Expand Down Expand Up @@ -90,10 +99,10 @@ impl EffectInstance {
listener.await;
}
State::NotStarted(EffectInner { future }) => {
let join_handle = tokio::spawn(
let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope(
turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future)
.instrument(Span::current()),
);
));
let result = match join_handle.await {
Ok(Err(err)) => Err(SharedError::new(err)),
Err(err) => {
Expand Down Expand Up @@ -170,20 +179,22 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
return Ok(());
}
let span = tracing::info_span!("apply effects", count = effects.len());
async move {
// Limit the concurrency of effects
futures::stream::iter(effects)
.map(Ok)
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) else {
panic!("Effect must only be implemented by EffectInstance");
};
effect.await?.apply().await
})
.await
}
.instrument(span)
.await
APPLY_EFFECTS_CONTEXT
.scope(Default::default(), async move {
// Limit the concurrency of effects
futures::stream::iter(effects)
.map(Ok)
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect)
else {
panic!("Effect must only be implemented by EffectInstance");
};
effect.await?.apply().await
})
.await
})
.instrument(span)
.await
}

/// Capture effects from an turbo-tasks operation. Since this captures collectibles it might
Expand Down Expand Up @@ -252,17 +263,81 @@ impl Effects {
/// Applies all effects that have been captured by this struct.
pub async fn apply(&self) -> Result<()> {
let span = tracing::info_span!("apply effects", count = self.effects.len());
async move {
// Limit the concurrency of effects
futures::stream::iter(self.effects.iter())
.map(Ok)
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
effect.apply().await
APPLY_EFFECTS_CONTEXT
.scope(Default::default(), async move {
// Limit the concurrency of effects
futures::stream::iter(self.effects.iter())
.map(Ok)
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
effect.apply().await
})
.await
})
.instrument(span)
.await
}
}

task_local! {
/// The context of the current effects application.
static APPLY_EFFECTS_CONTEXT: Arc<Mutex<ApplyEffectsContext>>;
}

#[derive(Default)]
pub struct ApplyEffectsContext {
data: FxHashMap<TypeId, Box<dyn Any + Send + Sync>>,
}

impl ApplyEffectsContext {
fn in_current_scope<F: Future>(f: F) -> impl Future<Output = F::Output> {
let current = Self::current();
APPLY_EFFECTS_CONTEXT.scope(current, f)
}

fn current() -> Arc<Mutex<Self>> {
APPLY_EFFECTS_CONTEXT
.try_with(|mutex| mutex.clone())
.expect("No effect context found")
}

fn with_context<T, F: FnOnce(&mut Self) -> T>(f: F) -> T {
APPLY_EFFECTS_CONTEXT
.try_with(|mutex| f(&mut mutex.lock()))
.expect("No effect context found")
}

pub fn set<T: Any + Send + Sync>(value: T) {
Self::with_context(|this| {
this.data.insert(TypeId::of::<T>(), Box::new(value));
})
}

pub fn with<T: Any + Send + Sync, R>(f: impl FnOnce(&mut T) -> R) -> Option<R> {
Self::with_context(|this| {
this.data
.get_mut(&TypeId::of::<T>())
.map(|value| {
// Safety: the map is keyed by TypeId
unsafe { value.downcast_mut_unchecked() }
})
.await
}
.instrument(span)
.await
.map(f)
})
}

pub fn with_or_insert_with<T: Any + Send + Sync, R>(
insert_with: impl FnOnce() -> T,
f: impl FnOnce(&mut T) -> R,
) -> R {
Self::with_context(|this| {
let value = this.data.entry(TypeId::of::<T>()).or_insert_with(|| {
let value = insert_with();
Box::new(value)
});
f(
// Safety: the map is keyed by TypeId
unsafe { value.downcast_mut_unchecked() },
)
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#![feature(arbitrary_self_types_pointers)]
#![feature(new_zeroed_alloc)]
#![feature(never_type)]
#![feature(downcast_unchecked)]

pub mod backend;
mod capture_future;
Expand Down Expand Up @@ -89,7 +90,7 @@ use auto_hash_map::AutoSet;
pub use collectibles::CollectiblesSource;
pub use completion::{Completion, Completions};
pub use display::ValueToString;
pub use effect::{apply_effects, effect, get_effects, Effects};
pub use effect::{apply_effects, effect, get_effects, ApplyEffectsContext, Effects};
pub use id::{
ExecutionId, FunctionId, LocalTaskId, SessionId, TaskId, TraitTypeId, ValueTypeId,
TRANSIENT_TASK_BIT,
Expand Down
Loading