From f20de35c468152ede15d3a69af2afaf549f66466 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 07:47:32 +0200 Subject: [PATCH 1/8] use try_for_each_concurrent From 2209f257eb5ddc1836cc48e2b375abfdafa1b667 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 30 Apr 2025 19:52:32 +0200 Subject: [PATCH 2/8] cache directory creation --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 69 +++++++++++++--------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 27839318b6797..45d3fbc7f6657 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -39,6 +39,7 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use auto_hash_map::{AutoMap, AutoSet}; use bitflags::bitflags; +use dashmap::DashSet; use dunce::simplified; use glob::Glob; use indexmap::IndexSet; @@ -226,6 +227,10 @@ struct DiskFileSystemInner { #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip)] invalidation_lock: RwLock<()>, + /// A cache of already created directories to avoid creating them multiple times. + #[turbo_tasks(debug_ignore, trace_ignore)] + #[serde(skip)] + created_directories: DashSet, /// Semaphore to limit the maximum number of concurrent file operations. #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip, default = "create_semaphore")] @@ -393,6 +398,27 @@ impl DiskFileSystemInner { Ok(()) } + + async fn create_directory(self: &Arc, directory: &Path) -> Result<()> { + if !self.created_directories.contains(directory) { + let func = { + let inner = self.clone(); + move |p: &Path| -> io::Result<()> { + std::fs::create_dir_all(p)?; + inner.created_directories.insert(p.to_path_buf()); + Ok(()) + } + }; + retry_blocking(directory, func) + .concurrency_limited(&self.semaphore) + .instrument(tracing::info_span!( + "create directory", + path = display(directory.display()) + )) + .await?; + } + Ok(()) + } } #[turbo_tasks::value(cell = "new", eq = "manual")] @@ -499,6 +525,7 @@ impl DiskFileSystem { invalidation_lock: Default::default(), invalidator_map: InvalidatorMap::new(), dir_invalidator_map: InvalidatorMap::new(), + created_directories: Default::default(), semaphore: create_semaphore(), watcher: DiskWatcher::new( ignored_subpaths.into_iter().map(PathBuf::from).collect(), @@ -735,20 +762,13 @@ impl FileSystem for DiskFileSystem { let create_directory = compare == FileComparison::Create; if create_directory { if let Some(parent) = full_path.parent() { - retry_blocking(parent, |p| std::fs::create_dir_all(p)) - .concurrency_limited(&inner.semaphore) - .instrument(tracing::info_span!( - "create directory", - path = display(parent.display()) - )) - .await - .with_context(|| { - format!( - "failed to create directory {} for write to {}", - parent.display(), - full_path.display() - ) - })?; + inner.create_directory(parent).await.with_context(|| { + format!( + "failed to create directory {} for write to {}", + parent.display(), + full_path.display() + ) + })?; } } let full_path_to_write = full_path.clone(); @@ -872,20 +892,13 @@ impl FileSystem for DiskFileSystem { let create_directory = old_content.is_none(); if create_directory { if let Some(parent) = full_path.parent() { - retry_blocking(parent, |path| std::fs::create_dir_all(path)) - .concurrency_limited(&inner.semaphore) - .instrument(tracing::info_span!( - "create directory", - path = display(parent.display()) - )) - .await - .with_context(|| { - format!( - "failed to create directory {} for write to {}", - parent.display(), - full_path.display() - ) - })?; + inner.create_directory(parent).await.with_context(|| { + format!( + "failed to create directory {} for write link to {}", + parent.display(), + full_path.display() + ) + })?; } } From 04404facadd5ecb2c1b35068565cefa13372c7d7 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 08:50:09 +0200 Subject: [PATCH 3/8] only cache created directories per apply_effects call --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 29 +++-- turbopack/crates/turbo-tasks/src/effect.rs | 120 ++++++++++++++++----- turbopack/crates/turbo-tasks/src/lib.rs | 2 +- 3 files changed, 113 insertions(+), 38 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 45d3fbc7f6657..05b30b9d22599 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -39,7 +39,6 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use auto_hash_map::{AutoMap, AutoSet}; use bitflags::bitflags; -use dashmap::DashSet; use dunce::simplified; use glob::Glob; use indexmap::IndexSet; @@ -49,6 +48,7 @@ use mime::Mime; use rayon::iter::{IntoParallelIterator, ParallelIterator}; pub use read_glob::ReadGlobResult; use read_glob::{read_glob, track_glob}; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::{ @@ -60,8 +60,8 @@ use tracing::Instrument; use turbo_rcstr::RcStr; use turbo_tasks::{ debug::ValueDebugFormat, effect, mark_session_dependent, mark_stateful, trace::TraceRawVcs, - Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, ValueToString, - Vc, + ApplyEffectContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, + ResolvedVc, ValueToString, Vc, }; use turbo_tasks_hash::{ hash_xxh3_hash128, hash_xxh3_hash64, DeterministicHash, DeterministicHasher, @@ -209,6 +209,12 @@ pub trait FileSystem: ValueToString { fn metadata(self: Vc, fs_path: Vc) -> Vc; } +#[derive(Default)] +struct DiskFileSystemApplyContext { + /// A cache of already created directories to avoid creating them multiple times. + created_directories: FxHashSet, +} + #[derive(Serialize, Deserialize, TraceRawVcs, ValueDebugFormat, NonLocalValue)] struct DiskFileSystemInner { pub name: RcStr, @@ -227,10 +233,6 @@ struct DiskFileSystemInner { #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip)] invalidation_lock: RwLock<()>, - /// A cache of already created directories to avoid creating them multiple times. - #[turbo_tasks(debug_ignore, trace_ignore)] - #[serde(skip)] - created_directories: DashSet, /// Semaphore to limit the maximum number of concurrent file operations. #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip, default = "create_semaphore")] @@ -400,12 +402,18 @@ impl DiskFileSystemInner { } async fn create_directory(self: &Arc, directory: &Path) -> Result<()> { - if !self.created_directories.contains(directory) { + let already_created = ApplyEffectContext::with_or_insert_with( + DiskFileSystemApplyContext::default, + |fs_context| fs_context.created_directories.contains(directory), + ); + if !already_created { let func = { - let inner = self.clone(); move |p: &Path| -> io::Result<()> { std::fs::create_dir_all(p)?; - inner.created_directories.insert(p.to_path_buf()); + let path_buf = p.to_path_buf(); + ApplyEffectContext::with(|fs_context: &mut DiskFileSystemApplyContext| { + fs_context.created_directories.insert(path_buf) + }); Ok(()) } }; @@ -525,7 +533,6 @@ impl DiskFileSystem { invalidation_lock: Default::default(), invalidator_map: InvalidatorMap::new(), dir_invalidator_map: InvalidatorMap::new(), - created_directories: Default::default(), semaphore: create_semaphore(), watcher: DiskWatcher::new( ignored_subpaths.into_iter().map(PathBuf::from).collect(), diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index 7a90345f86197..a9fcf2bc02146 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -1,10 +1,18 @@ -use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin}; +use std::{ + any::{Any, TypeId}, + borrow::Cow, + future::Future, + mem::replace, + panic, + pin::Pin, +}; use anyhow::{anyhow, Result}; use auto_hash_map::AutoSet; use futures::{StreamExt, TryStreamExt}; use parking_lot::Mutex; -use rustc_hash::FxHashSet; +use rustc_hash::{FxHashMap, FxHashSet}; +use tokio::task_local; use tracing::{Instrument, Span}; use crate::{ @@ -170,20 +178,22 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> { return Ok(()); } let span = tracing::info_span!("apply effects", count = effects.len()); - async move { - // Limit the concurrency of effects - futures::stream::iter(effects) - .map(Ok) - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { - let Some(effect) = ResolvedVc::try_downcast_type::(effect) else { - panic!("Effect must only be implemented by EffectInstance"); - }; - effect.await?.apply().await - }) - .await - } - .instrument(span) - .await + APPLY_EFFECT_CONTEXT + .scope(Mutex::new(ApplyEffectContext::new()), async move { + // Limit the concurrency of effects + futures::stream::iter(effects) + .map(Ok) + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { + let Some(effect) = ResolvedVc::try_downcast_type::(effect) + else { + panic!("Effect must only be implemented by EffectInstance"); + }; + effect.await?.apply().await + }) + .await + }) + .instrument(span) + .await } /// Capture effects from an turbo-tasks operation. Since this captures collectibles it might @@ -252,17 +262,75 @@ impl Effects { /// Applies all effects that have been captured by this struct. pub async fn apply(&self) -> Result<()> { let span = tracing::info_span!("apply effects", count = self.effects.len()); - async move { - // Limit the concurrency of effects - futures::stream::iter(self.effects.iter()) - .map(Ok) - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { - effect.apply().await - }) - .await + APPLY_EFFECT_CONTEXT + .scope(Mutex::new(ApplyEffectContext::new()), async move { + // Limit the concurrency of effects + futures::stream::iter(self.effects.iter()) + .map(Ok) + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { + effect.apply().await + }) + .await + }) + .instrument(span) + .await + } +} + +task_local! { + /// The context of the current effects application. + static APPLY_EFFECT_CONTEXT: Mutex; +} + +pub struct ApplyEffectContext { + data: FxHashMap>, +} + +impl ApplyEffectContext { + pub fn new() -> Self { + Self { + data: FxHashMap::default(), } - .instrument(span) - .await + } + + fn with_context T>(f: F) -> T { + APPLY_EFFECT_CONTEXT + .try_with(|context| f(&mut *context.lock())) + .expect("No effect context found") + } + + pub fn set(value: T) { + Self::with_context(|context| { + context.data.insert(TypeId::of::(), Box::new(value)); + }) + } + + pub fn with(f: impl FnOnce(&mut T) -> R) -> Option { + Self::with_context(|context| { + context + .data + .get_mut(&TypeId::of::()) + .and_then(|value| value.downcast_mut()) + .map(f) + }) + } + + pub fn with_or_insert_with( + insert_with: impl FnOnce() -> T, + f: impl FnOnce(&mut T) -> R, + ) -> R { + Self::with_context(|context| { + f(context + .data + .entry(TypeId::of::()) + .or_insert_with(|| { + let value = insert_with(); + let value = Box::new(value); + value + }) + .downcast_mut() + .unwrap()) + }) } } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 936f2ffb9c056..722f2c39ba4f9 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -89,7 +89,7 @@ use auto_hash_map::AutoSet; pub use collectibles::CollectiblesSource; pub use completion::{Completion, Completions}; pub use display::ValueToString; -pub use effect::{apply_effects, effect, get_effects, Effects}; +pub use effect::{apply_effects, effect, get_effects, ApplyEffectContext, Effects}; pub use id::{ ExecutionId, FunctionId, LocalTaskId, SessionId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT, From f43ae7b2c4ace21e5414dfc8ba1e3eb9ab40dca8 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 08:57:11 +0200 Subject: [PATCH 4/8] use unsafe for better performance --- turbopack/crates/turbo-tasks/src/effect.rs | 24 ++++++++++++---------- turbopack/crates/turbo-tasks/src/lib.rs | 1 + 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index a9fcf2bc02146..f92a170a86273 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -310,7 +310,10 @@ impl ApplyEffectContext { context .data .get_mut(&TypeId::of::()) - .and_then(|value| value.downcast_mut()) + .map(|value| { + // Safety: the map is keyed by TypeId + unsafe { value.downcast_mut_unchecked() } + }) .map(f) }) } @@ -320,16 +323,15 @@ impl ApplyEffectContext { f: impl FnOnce(&mut T) -> R, ) -> R { Self::with_context(|context| { - f(context - .data - .entry(TypeId::of::()) - .or_insert_with(|| { - let value = insert_with(); - let value = Box::new(value); - value - }) - .downcast_mut() - .unwrap()) + let value = context.data.entry(TypeId::of::()).or_insert_with(|| { + let value = insert_with(); + let value = Box::new(value); + value + }); + f( + // Safety: the map is keyed by TypeId + unsafe { value.downcast_mut_unchecked() }, + ) }) } } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 722f2c39ba4f9..f4744eaf55a2d 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -36,6 +36,7 @@ #![feature(arbitrary_self_types_pointers)] #![feature(new_zeroed_alloc)] #![feature(never_type)] +#![feature(downcast_unchecked)] pub mod backend; mod capture_future; From 0b9d3a8e4844e8ed8f6ee22c8a7970f608e606da Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 09:03:18 +0200 Subject: [PATCH 5/8] clippy --- turbopack/crates/turbo-tasks/src/effect.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index f92a170a86273..425e2ed861a9d 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -179,7 +179,7 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> { } let span = tracing::info_span!("apply effects", count = effects.len()); APPLY_EFFECT_CONTEXT - .scope(Mutex::new(ApplyEffectContext::new()), async move { + .scope(Default::default(), async move { // Limit the concurrency of effects futures::stream::iter(effects) .map(Ok) @@ -263,7 +263,7 @@ impl Effects { pub async fn apply(&self) -> Result<()> { let span = tracing::info_span!("apply effects", count = self.effects.len()); APPLY_EFFECT_CONTEXT - .scope(Mutex::new(ApplyEffectContext::new()), async move { + .scope(Default::default(), async move { // Limit the concurrency of effects futures::stream::iter(self.effects.iter()) .map(Ok) @@ -282,20 +282,15 @@ task_local! { static APPLY_EFFECT_CONTEXT: Mutex; } +#[derive(Default)] pub struct ApplyEffectContext { data: FxHashMap>, } impl ApplyEffectContext { - pub fn new() -> Self { - Self { - data: FxHashMap::default(), - } - } - fn with_context T>(f: F) -> T { APPLY_EFFECT_CONTEXT - .try_with(|context| f(&mut *context.lock())) + .try_with(|context| f(&mut context.lock())) .expect("No effect context found") } @@ -325,8 +320,7 @@ impl ApplyEffectContext { Self::with_context(|context| { let value = context.data.entry(TypeId::of::()).or_insert_with(|| { let value = insert_with(); - let value = Box::new(value); - value + Box::new(value) }); f( // Safety: the map is keyed by TypeId From bbc061589d73edc3d1d7fd72fdb5335d264a76bd Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 09:57:50 +0200 Subject: [PATCH 6/8] lint fixes --- turbopack/crates/turbo-tasks/src/effect.rs | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index 425e2ed861a9d..93665b04c5176 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -178,7 +178,7 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> { return Ok(()); } let span = tracing::info_span!("apply effects", count = effects.len()); - APPLY_EFFECT_CONTEXT + APPLY_EFFECTS_CONTEXT .scope(Default::default(), async move { // Limit the concurrency of effects futures::stream::iter(effects) @@ -262,7 +262,7 @@ impl Effects { /// Applies all effects that have been captured by this struct. pub async fn apply(&self) -> Result<()> { let span = tracing::info_span!("apply effects", count = self.effects.len()); - APPLY_EFFECT_CONTEXT + APPLY_EFFECTS_CONTEXT .scope(Default::default(), async move { // Limit the concurrency of effects futures::stream::iter(self.effects.iter()) @@ -279,7 +279,7 @@ impl Effects { task_local! { /// The context of the current effects application. - static APPLY_EFFECT_CONTEXT: Mutex; + static APPLY_EFFECTS_CONTEXT: Mutex; } #[derive(Default)] @@ -289,21 +289,20 @@ pub struct ApplyEffectContext { impl ApplyEffectContext { fn with_context T>(f: F) -> T { - APPLY_EFFECT_CONTEXT - .try_with(|context| f(&mut context.lock())) + APPLY_EFFECTS_CONTEXT + .try_with(|mutex| f(&mut mutex.lock())) .expect("No effect context found") } pub fn set(value: T) { - Self::with_context(|context| { - context.data.insert(TypeId::of::(), Box::new(value)); + Self::with_context(|this| { + this.data.insert(TypeId::of::(), Box::new(value)); }) } pub fn with(f: impl FnOnce(&mut T) -> R) -> Option { - Self::with_context(|context| { - context - .data + Self::with_context(|this| { + this.data .get_mut(&TypeId::of::()) .map(|value| { // Safety: the map is keyed by TypeId @@ -317,8 +316,8 @@ impl ApplyEffectContext { insert_with: impl FnOnce() -> T, f: impl FnOnce(&mut T) -> R, ) -> R { - Self::with_context(|context| { - let value = context.data.entry(TypeId::of::()).or_insert_with(|| { + Self::with_context(|this| { + let value = this.data.entry(TypeId::of::()).or_insert_with(|| { let value = insert_with(); Box::new(value) }); From 8fe8cf00263e2e32b7a1cc4b372fe661eb591581 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 8 May 2025 16:37:37 +0200 Subject: [PATCH 7/8] ApplyEffectContent can't be used in spawn_blocking --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 05b30b9d22599..7b25d29527257 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -407,16 +407,7 @@ impl DiskFileSystemInner { |fs_context| fs_context.created_directories.contains(directory), ); if !already_created { - let func = { - move |p: &Path| -> io::Result<()> { - std::fs::create_dir_all(p)?; - let path_buf = p.to_path_buf(); - ApplyEffectContext::with(|fs_context: &mut DiskFileSystemApplyContext| { - fs_context.created_directories.insert(path_buf) - }); - Ok(()) - } - }; + let func = |p: &Path| std::fs::create_dir_all(p); retry_blocking(directory, func) .concurrency_limited(&self.semaphore) .instrument(tracing::info_span!( @@ -424,6 +415,11 @@ impl DiskFileSystemInner { path = display(directory.display()) )) .await?; + ApplyEffectContext::with(|fs_context: &mut DiskFileSystemApplyContext| { + fs_context + .created_directories + .insert(directory.to_path_buf()) + }); } Ok(()) } From fd9b657e3cedc15c4f05a0c473d2385a3c802bf8 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 8 May 2025 17:04:48 +0200 Subject: [PATCH 8/8] rename and fixes --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 6 +++--- turbopack/crates/turbo-tasks/src/effect.rs | 22 +++++++++++++++++----- turbopack/crates/turbo-tasks/src/lib.rs | 2 +- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 7b25d29527257..0750ddc2e0be4 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -60,7 +60,7 @@ use tracing::Instrument; use turbo_rcstr::RcStr; use turbo_tasks::{ debug::ValueDebugFormat, effect, mark_session_dependent, mark_stateful, trace::TraceRawVcs, - ApplyEffectContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, + ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, ValueToString, Vc, }; use turbo_tasks_hash::{ @@ -402,7 +402,7 @@ impl DiskFileSystemInner { } async fn create_directory(self: &Arc, directory: &Path) -> Result<()> { - let already_created = ApplyEffectContext::with_or_insert_with( + let already_created = ApplyEffectsContext::with_or_insert_with( DiskFileSystemApplyContext::default, |fs_context| fs_context.created_directories.contains(directory), ); @@ -415,7 +415,7 @@ impl DiskFileSystemInner { path = display(directory.display()) )) .await?; - ApplyEffectContext::with(|fs_context: &mut DiskFileSystemApplyContext| { + ApplyEffectsContext::with(|fs_context: &mut DiskFileSystemApplyContext| { fs_context .created_directories .insert(directory.to_path_buf()) diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index 93665b04c5176..ed9bb28203097 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -5,6 +5,7 @@ use std::{ mem::replace, panic, pin::Pin, + sync::Arc, }; use anyhow::{anyhow, Result}; @@ -98,10 +99,10 @@ impl EffectInstance { listener.await; } State::NotStarted(EffectInner { future }) => { - let join_handle = tokio::spawn( + let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope( turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future) .instrument(Span::current()), - ); + )); let result = match join_handle.await { Ok(Err(err)) => Err(SharedError::new(err)), Err(err) => { @@ -279,15 +280,26 @@ impl Effects { task_local! { /// The context of the current effects application. - static APPLY_EFFECTS_CONTEXT: Mutex; + static APPLY_EFFECTS_CONTEXT: Arc>; } #[derive(Default)] -pub struct ApplyEffectContext { +pub struct ApplyEffectsContext { data: FxHashMap>, } -impl ApplyEffectContext { +impl ApplyEffectsContext { + fn in_current_scope(f: F) -> impl Future { + let current = Self::current(); + APPLY_EFFECTS_CONTEXT.scope(current, f) + } + + fn current() -> Arc> { + APPLY_EFFECTS_CONTEXT + .try_with(|mutex| mutex.clone()) + .expect("No effect context found") + } + fn with_context T>(f: F) -> T { APPLY_EFFECTS_CONTEXT .try_with(|mutex| f(&mut mutex.lock())) diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index f4744eaf55a2d..17c016f813fc8 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -90,7 +90,7 @@ use auto_hash_map::AutoSet; pub use collectibles::CollectiblesSource; pub use completion::{Completion, Completions}; pub use display::ValueToString; -pub use effect::{apply_effects, effect, get_effects, ApplyEffectContext, Effects}; +pub use effect::{apply_effects, effect, get_effects, ApplyEffectsContext, Effects}; pub use id::{ ExecutionId, FunctionId, LocalTaskId, SessionId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT,