diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 27839318b6797..0750ddc2e0be4 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -48,6 +48,7 @@ use mime::Mime; use rayon::iter::{IntoParallelIterator, ParallelIterator}; pub use read_glob::ReadGlobResult; use read_glob::{read_glob, track_glob}; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::{ @@ -59,8 +60,8 @@ use tracing::Instrument; use turbo_rcstr::RcStr; use turbo_tasks::{ debug::ValueDebugFormat, effect, mark_session_dependent, mark_stateful, trace::TraceRawVcs, - Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, ValueToString, - Vc, + ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, + ResolvedVc, ValueToString, Vc, }; use turbo_tasks_hash::{ hash_xxh3_hash128, hash_xxh3_hash64, DeterministicHash, DeterministicHasher, @@ -208,6 +209,12 @@ pub trait FileSystem: ValueToString { fn metadata(self: Vc, fs_path: Vc) -> Vc; } +#[derive(Default)] +struct DiskFileSystemApplyContext { + /// A cache of already created directories to avoid creating them multiple times. + created_directories: FxHashSet, +} + #[derive(Serialize, Deserialize, TraceRawVcs, ValueDebugFormat, NonLocalValue)] struct DiskFileSystemInner { pub name: RcStr, @@ -393,6 +400,29 @@ impl DiskFileSystemInner { Ok(()) } + + async fn create_directory(self: &Arc, directory: &Path) -> Result<()> { + let already_created = ApplyEffectsContext::with_or_insert_with( + DiskFileSystemApplyContext::default, + |fs_context| fs_context.created_directories.contains(directory), + ); + if !already_created { + let func = |p: &Path| std::fs::create_dir_all(p); + retry_blocking(directory, func) + .concurrency_limited(&self.semaphore) + .instrument(tracing::info_span!( + "create directory", + path = display(directory.display()) + )) + .await?; + ApplyEffectsContext::with(|fs_context: &mut DiskFileSystemApplyContext| { + fs_context + .created_directories + .insert(directory.to_path_buf()) + }); + } + Ok(()) + } } #[turbo_tasks::value(cell = "new", eq = "manual")] @@ -735,20 +765,13 @@ impl FileSystem for DiskFileSystem { let create_directory = compare == FileComparison::Create; if create_directory { if let Some(parent) = full_path.parent() { - retry_blocking(parent, |p| std::fs::create_dir_all(p)) - .concurrency_limited(&inner.semaphore) - .instrument(tracing::info_span!( - "create directory", - path = display(parent.display()) - )) - .await - .with_context(|| { - format!( - "failed to create directory {} for write to {}", - parent.display(), - full_path.display() - ) - })?; + inner.create_directory(parent).await.with_context(|| { + format!( + "failed to create directory {} for write to {}", + parent.display(), + full_path.display() + ) + })?; } } let full_path_to_write = full_path.clone(); @@ -872,20 +895,13 @@ impl FileSystem for DiskFileSystem { let create_directory = old_content.is_none(); if create_directory { if let Some(parent) = full_path.parent() { - retry_blocking(parent, |path| std::fs::create_dir_all(path)) - .concurrency_limited(&inner.semaphore) - .instrument(tracing::info_span!( - "create directory", - path = display(parent.display()) - )) - .await - .with_context(|| { - format!( - "failed to create directory {} for write to {}", - parent.display(), - full_path.display() - ) - })?; + inner.create_directory(parent).await.with_context(|| { + format!( + "failed to create directory {} for write link to {}", + parent.display(), + full_path.display() + ) + })?; } } diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index 7a90345f86197..ed9bb28203097 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -1,10 +1,19 @@ -use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin}; +use std::{ + any::{Any, TypeId}, + borrow::Cow, + future::Future, + mem::replace, + panic, + pin::Pin, + sync::Arc, +}; use anyhow::{anyhow, Result}; use auto_hash_map::AutoSet; use futures::{StreamExt, TryStreamExt}; use parking_lot::Mutex; -use rustc_hash::FxHashSet; +use rustc_hash::{FxHashMap, FxHashSet}; +use tokio::task_local; use tracing::{Instrument, Span}; use crate::{ @@ -90,10 +99,10 @@ impl EffectInstance { listener.await; } State::NotStarted(EffectInner { future }) => { - let join_handle = tokio::spawn( + let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope( turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future) .instrument(Span::current()), - ); + )); let result = match join_handle.await { Ok(Err(err)) => Err(SharedError::new(err)), Err(err) => { @@ -170,20 +179,22 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> { return Ok(()); } let span = tracing::info_span!("apply effects", count = effects.len()); - async move { - // Limit the concurrency of effects - futures::stream::iter(effects) - .map(Ok) - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { - let Some(effect) = ResolvedVc::try_downcast_type::(effect) else { - panic!("Effect must only be implemented by EffectInstance"); - }; - effect.await?.apply().await - }) - .await - } - .instrument(span) - .await + APPLY_EFFECTS_CONTEXT + .scope(Default::default(), async move { + // Limit the concurrency of effects + futures::stream::iter(effects) + .map(Ok) + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { + let Some(effect) = ResolvedVc::try_downcast_type::(effect) + else { + panic!("Effect must only be implemented by EffectInstance"); + }; + effect.await?.apply().await + }) + .await + }) + .instrument(span) + .await } /// Capture effects from an turbo-tasks operation. Since this captures collectibles it might @@ -252,17 +263,81 @@ impl Effects { /// Applies all effects that have been captured by this struct. pub async fn apply(&self) -> Result<()> { let span = tracing::info_span!("apply effects", count = self.effects.len()); - async move { - // Limit the concurrency of effects - futures::stream::iter(self.effects.iter()) - .map(Ok) - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { - effect.apply().await + APPLY_EFFECTS_CONTEXT + .scope(Default::default(), async move { + // Limit the concurrency of effects + futures::stream::iter(self.effects.iter()) + .map(Ok) + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { + effect.apply().await + }) + .await + }) + .instrument(span) + .await + } +} + +task_local! { + /// The context of the current effects application. + static APPLY_EFFECTS_CONTEXT: Arc>; +} + +#[derive(Default)] +pub struct ApplyEffectsContext { + data: FxHashMap>, +} + +impl ApplyEffectsContext { + fn in_current_scope(f: F) -> impl Future { + let current = Self::current(); + APPLY_EFFECTS_CONTEXT.scope(current, f) + } + + fn current() -> Arc> { + APPLY_EFFECTS_CONTEXT + .try_with(|mutex| mutex.clone()) + .expect("No effect context found") + } + + fn with_context T>(f: F) -> T { + APPLY_EFFECTS_CONTEXT + .try_with(|mutex| f(&mut mutex.lock())) + .expect("No effect context found") + } + + pub fn set(value: T) { + Self::with_context(|this| { + this.data.insert(TypeId::of::(), Box::new(value)); + }) + } + + pub fn with(f: impl FnOnce(&mut T) -> R) -> Option { + Self::with_context(|this| { + this.data + .get_mut(&TypeId::of::()) + .map(|value| { + // Safety: the map is keyed by TypeId + unsafe { value.downcast_mut_unchecked() } }) - .await - } - .instrument(span) - .await + .map(f) + }) + } + + pub fn with_or_insert_with( + insert_with: impl FnOnce() -> T, + f: impl FnOnce(&mut T) -> R, + ) -> R { + Self::with_context(|this| { + let value = this.data.entry(TypeId::of::()).or_insert_with(|| { + let value = insert_with(); + Box::new(value) + }); + f( + // Safety: the map is keyed by TypeId + unsafe { value.downcast_mut_unchecked() }, + ) + }) } } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 936f2ffb9c056..17c016f813fc8 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -36,6 +36,7 @@ #![feature(arbitrary_self_types_pointers)] #![feature(new_zeroed_alloc)] #![feature(never_type)] +#![feature(downcast_unchecked)] pub mod backend; mod capture_future; @@ -89,7 +90,7 @@ use auto_hash_map::AutoSet; pub use collectibles::CollectiblesSource; pub use completion::{Completion, Completions}; pub use display::ValueToString; -pub use effect::{apply_effects, effect, get_effects, Effects}; +pub use effect::{apply_effects, effect, get_effects, ApplyEffectsContext, Effects}; pub use id::{ ExecutionId, FunctionId, LocalTaskId, SessionId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT,