Skip to content

Commit 2089c24

Browse files
sokraeps1lon
authored andcommitted
Turbopack: avoid storing task data and task cache concurrenctly (#78775)
### What? Avoid to concurrently write task data and task cache as it increases memory usage by a lot.
1 parent 5affdb0 commit 2089c24

File tree

1 file changed

+67
-75
lines changed

1 file changed

+67
-75
lines changed

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 67 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -177,92 +177,84 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
177177
{
178178
let _span = tracing::trace_span!("save snapshot", session_id = ?session_id, operations = operations.len());
179179
let mut batch = self.database.write_batch()?;
180-
let mut task_items_result = Ok(Vec::new());
181180

182181
// Start organizing the updates in parallel
183182
match &mut batch {
184183
WriteBatch::Concurrent(ref batch, _) => {
185-
turbo_tasks::scope(|s| {
186-
s.spawn(|_| {
187-
let _span = tracing::trace_span!("update task meta").entered();
188-
task_items_result = process_task_data(snapshots, Some(batch));
189-
});
184+
{
185+
let _span = tracing::trace_span!("update task data").entered();
186+
process_task_data(snapshots, Some(batch))?;
187+
}
190188

191-
let mut next_task_id =
192-
get_next_free_task_id::<
193-
T::SerialWriteBatch<'_>,
194-
T::ConcurrentWriteBatch<'_>,
195-
>(&mut WriteBatchRef::concurrent(batch))?;
189+
let mut next_task_id = get_next_free_task_id::<
190+
T::SerialWriteBatch<'_>,
191+
T::ConcurrentWriteBatch<'_>,
192+
>(&mut WriteBatchRef::concurrent(batch))?;
196193

197-
{
198-
let _span = tracing::trace_span!(
199-
"update task cache",
200-
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
201-
)
202-
.entered();
203-
let result = task_cache_updates
204-
.into_par_iter()
205-
.with_max_len(1)
206-
.map(|updates| {
207-
let mut max_task_id = 0;
208-
209-
let mut task_type_bytes = Vec::new();
210-
for (task_type, task_id) in updates {
211-
let task_id: u32 = *task_id;
212-
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
213-
214-
batch
215-
.put(
216-
KeySpace::ForwardTaskCache,
217-
WriteBuffer::Borrowed(&task_type_bytes),
218-
WriteBuffer::Borrowed(&task_id.to_le_bytes()),
194+
{
195+
let _span = tracing::trace_span!(
196+
"update task cache",
197+
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
198+
)
199+
.entered();
200+
let result = task_cache_updates
201+
.into_par_iter()
202+
.with_max_len(1)
203+
.map(|updates| {
204+
let mut max_task_id = 0;
205+
206+
let mut task_type_bytes = Vec::new();
207+
for (task_type, task_id) in updates {
208+
let task_id: u32 = *task_id;
209+
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
210+
211+
batch
212+
.put(
213+
KeySpace::ForwardTaskCache,
214+
WriteBuffer::Borrowed(&task_type_bytes),
215+
WriteBuffer::Borrowed(&task_id.to_le_bytes()),
216+
)
217+
.with_context(|| {
218+
anyhow!(
219+
"Unable to write task cache {task_type:?} => {task_id}"
219220
)
220-
.with_context(|| {
221-
anyhow!(
222-
"Unable to write task cache {task_type:?} => \
223-
{task_id}"
224-
)
225-
})?;
226-
batch
227-
.put(
228-
KeySpace::ReverseTaskCache,
229-
WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
230-
WriteBuffer::Borrowed(&task_type_bytes),
221+
})?;
222+
batch
223+
.put(
224+
KeySpace::ReverseTaskCache,
225+
WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
226+
WriteBuffer::Borrowed(&task_type_bytes),
227+
)
228+
.with_context(|| {
229+
anyhow!(
230+
"Unable to write task cache {task_id} => {task_type:?}"
231231
)
232-
.with_context(|| {
233-
anyhow!(
234-
"Unable to write task cache {task_id} => \
235-
{task_type:?}"
236-
)
237-
})?;
238-
max_task_id = max_task_id.max(task_id + 1);
239-
}
240-
241-
Ok(max_task_id)
242-
})
243-
.reduce(
244-
|| Ok(0),
245-
|a, b| -> anyhow::Result<_> {
246-
let a_max = a?;
247-
let b_max = b?;
248-
Ok(max(a_max, b_max))
249-
},
250-
)?;
251-
next_task_id = next_task_id.max(result);
252-
}
253-
254-
save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
255-
&mut WriteBatchRef::concurrent(batch),
256-
next_task_id,
257-
session_id,
258-
operations,
259-
)?;
260-
anyhow::Ok(())
261-
})?;
232+
})?;
233+
max_task_id = max_task_id.max(task_id + 1);
234+
}
235+
236+
Ok(max_task_id)
237+
})
238+
.reduce(
239+
|| Ok(0),
240+
|a, b| -> anyhow::Result<_> {
241+
let a_max = a?;
242+
let b_max = b?;
243+
Ok(max(a_max, b_max))
244+
},
245+
)?;
246+
next_task_id = next_task_id.max(result);
247+
}
262248

263-
task_items_result?;
249+
save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
250+
&mut WriteBatchRef::concurrent(batch),
251+
next_task_id,
252+
session_id,
253+
operations,
254+
)?;
264255
}
265256
WriteBatch::Serial(batch) => {
257+
let mut task_items_result = Ok(Vec::new());
266258
turbo_tasks::scope(|s| {
267259
s.spawn(|_| {
268260
task_items_result =

0 commit comments

Comments
 (0)