Skip to content

Commit 814c810

Browse files
committed
commit 1: save the state of the repository
Signed-off-by: raayandhar <[email protected]>
1 parent f156221 commit 814c810

File tree

12 files changed

+2411
-36
lines changed

12 files changed

+2411
-36
lines changed

cpp/include/tensorrt_llm/executor/executor.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,13 +1454,15 @@ class CacheTransceiverConfig
14541454
UCX = 2,
14551455
NIXL = 3
14561456
};
1457-
explicit CacheTransceiverConfig(
1458-
std::optional<BackendType> backendType = std::nullopt, std::optional<size_t> maxNumTokens = std::nullopt);
1457+
explicit CacheTransceiverConfig(std::optional<BackendType> backendType = std::nullopt,
1458+
std::optional<size_t> maxNumTokens = std::nullopt, std::optional<int> kvTransferTimeoutMs = std::nullopt);
14591459

14601460
bool operator==(CacheTransceiverConfig const& other) const;
14611461
void setBackendType(std::optional<BackendType> backendType);
14621462
void setMaxTokensInBuffer(std::optional<size_t> maxTokensInBuffer);
1463+
void setKvTransferTimeoutMs(std::optional<int> kvTransferTimeoutMs);
14631464

1465+
[[nodiscard]] std::optional<int> getKvTransferTimeoutMs() const;
14641466
[[nodiscard]] std::optional<size_t> getMaxTokensInBuffer() const;
14651467
[[nodiscard]] std::optional<BackendType> getBackendType() const;
14661468

@@ -1470,6 +1472,7 @@ class CacheTransceiverConfig
14701472
/// kvCache tokens to be transferred for a single request is greater than this value, the performance of the cache
14711473
/// transfer may be degraded.
14721474
std::optional<size_t> mMaxTokensInBuffer;
1475+
std::optional<int> mKvTransferTimeoutMs;
14731476
};
14741477

14751478
/// @brief Configuration class for the model executor

cpp/tensorrt_llm/batch_manager/cacheTransceiver.cpp

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ void CacheTransceiver::setContextState(LlmRequest* llmRequest)
238238
void CacheTransceiver::respondAndSendAsync(LlmRequest* llmRequest)
239239
{
240240
TLLM_CHECK(llmRequest && llmRequest->isContextOnlyRequest());
241+
// TEST HOOK: Skip creating responder future for a specific request ID to validate HOL blocking theory
242+
if (llmRequest->mRequestId == 2049)
243+
{
244+
llmRequest->setState(LlmRequestState::kDISAGG_CONTEXT_TRANS_IN_PROGRESS);
245+
TLLM_LOG_WARNING("TEST: Skipping responder future for context request 2049");
246+
setContextState(llmRequest);
247+
return;
248+
}
241249
llmRequest->setState(LlmRequestState::kDISAGG_CONTEXT_TRANS_IN_PROGRESS);
242250
// If context phase params is already set, it means that the KV cache
243251
// transfer is already in progress.
@@ -252,6 +260,8 @@ void CacheTransceiver::respondAndSendAsync(LlmRequest* llmRequest)
252260
setContextState(llmRequest);
253261
auto future = mDataResponder->respondAndSendAsync(*llmRequest);
254262
mResponderFutures.emplace_back(llmRequest, std::move(future));
263+
TLLM_LOG_DEBUG("respondAndSendAsync: enqueued context request %ld, mResponderFutures.size()=%zu",
264+
llmRequest->mRequestId, mResponderFutures.size());
255265
}
256266

257267
void CacheTransceiver::respondAndSendLayerWise(
@@ -301,20 +311,34 @@ void CacheTransceiver::requestAndReceiveAsync(LlmRequest* llmRequest)
301311
std::vector<LlmRequest::RequestIdType> gatherRequestIds(
302312
mpi::MpiComm const& mpiComm, std::vector<LlmRequest::RequestIdType> const& requestIds)
303313
{
314+
TLLM_LOG_DEBUG("gatherRequestIds: Entry - rank %d, localSize=%zu, worldSize=%d", mpiComm.getRank(),
315+
requestIds.size(), mpiComm.getSize());
316+
304317
int localSize = static_cast<int>(requestIds.size());
305318
std::vector<int> sizes(mpiComm.getSize());
319+
320+
TLLM_LOG_DEBUG("gatherRequestIds: Starting allgather for sizes");
306321
mpiComm.allgather(&localSize, sizes.data(), 1, mpi::MpiType::kINT32);
322+
TLLM_LOG_DEBUG("gatherRequestIds: allgather for sizes completed");
323+
307324
// std::vector<LlmRequest::RequestIdType> all_data(total_size);
308325
std::vector<int> displs(mpiComm.getSize());
309326
int totalSize = 0;
310327
for (int i = 0; i < mpiComm.getSize(); i++)
311328
{
312329
displs[i] = totalSize;
313330
totalSize += sizes[i];
331+
TLLM_LOG_DEBUG("gatherRequestIds: Rank %d has %d request IDs", i, sizes[i]);
314332
}
333+
334+
TLLM_LOG_DEBUG("gatherRequestIds: Total size across all ranks: %d", totalSize);
315335
std::vector<LlmRequest::RequestIdType> retData(totalSize);
336+
337+
TLLM_LOG_DEBUG("gatherRequestIds: Starting allgatherv for request IDs");
316338
mpiComm.allgatherv(requestIds.data(), static_cast<int>(requestIds.size()), mpi::MpiType::kUINT64, retData.data(),
317339
sizes, displs, mpi::MpiType::kUINT64);
340+
TLLM_LOG_DEBUG("gatherRequestIds: allgatherv for request IDs completed, returning %zu total IDs", retData.size());
341+
318342
return retData;
319343
}
320344

@@ -370,72 +394,153 @@ void updateKVCacheTransferBW(mpi::MpiComm const& mpiComm, LlmRequest* request)
370394

371395
void CacheTransceiver::checkContextTransferStatus(std::optional<int> const& atLeastRequestNum)
372396
{
397+
TLLM_LOG_DEBUG("checkContextTransferStatus: Entry with atLeastRequestNum=%s, mResponderFutures.size()=%zu",
398+
atLeastRequestNum.has_value() ? std::to_string(atLeastRequestNum.value()).c_str() : "nullopt",
399+
mResponderFutures.size());
400+
401+
// Dump current responder future queue order for diagnostics
402+
{
403+
std::ostringstream oss;
404+
oss << "[";
405+
bool first = true;
406+
for (auto const& pair : mResponderFutures)
407+
{
408+
if (!first)
409+
{
410+
oss << ", ";
411+
}
412+
first = false;
413+
oss << pair.first->mRequestId;
414+
}
415+
oss << "]";
416+
TLLM_LOG_DEBUG("checkContextTransferStatus: mResponderFutures order: %s", oss.str().c_str());
417+
}
418+
373419
bool blockAll = !atLeastRequestNum.has_value();
374420
auto syncComm = mCacheState->getParallelConfig().mEnableAttentionDP ? mMpiGroupTPInDPComm : mMpiGroupTensorParaComm;
421+
422+
TLLM_LOG_DEBUG("checkContextTransferStatus: blockAll=%s, syncComm=%s, syncComm.size=%d",
423+
blockAll ? "true" : "false", syncComm ? "valid" : "null", syncComm ? syncComm->getSize() : 0);
424+
375425
std::vector<LlmRequest::RequestIdType> contextCompleteRequestIds;
426+
TLLM_LOG_DEBUG(
427+
"checkContextTransferStatus: Checking %zu responder futures for completion", mResponderFutures.size());
428+
376429
for (auto&& [request, future] : mResponderFutures)
377430
{
431+
TLLM_LOG_DEBUG("checkContextTransferStatus: Checking request %ld future status", request->mRequestId);
378432
if (future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready)
379433
{
434+
TLLM_LOG_DEBUG("checkContextTransferStatus: Request %ld is ready", request->mRequestId);
380435
contextCompleteRequestIds.push_back(request->mRequestId);
381436
}
437+
else
438+
{
439+
TLLM_LOG_DEBUG("checkContextTransferStatus: Request %ld is not ready", request->mRequestId);
440+
}
382441
}
383442

443+
TLLM_LOG_DEBUG("checkContextTransferStatus: Found %zu ready requests", contextCompleteRequestIds.size());
444+
384445
std::unordered_map<LlmRequest::RequestIdType, int> frequencyMap;
385446
if ((syncComm) && syncComm->getSize() > 1)
386447
{
448+
TLLM_LOG_DEBUG(
449+
"checkContextTransferStatus: Gathering request IDs across %d ranks via MPI", syncComm->getSize());
387450
auto gatherRequestIdVec = gatherRequestIds(*syncComm, contextCompleteRequestIds);
451+
TLLM_LOG_DEBUG("checkContextTransferStatus: MPI gather completed, received %zu total request IDs",
452+
gatherRequestIdVec.size());
453+
388454
for (auto&& requestId : gatherRequestIdVec)
389455
{
390456
frequencyMap[requestId]++;
391457
}
458+
TLLM_LOG_DEBUG(
459+
"checkContextTransferStatus: Built frequency map with %zu unique request IDs", frequencyMap.size());
392460
}
393461
else
394462
{
463+
TLLM_LOG_DEBUG("checkContextTransferStatus: Single rank mode, building frequency map locally");
395464
for (auto&& requestId : contextCompleteRequestIds)
396465
{
397466
frequencyMap[requestId]++;
398467
}
468+
TLLM_LOG_DEBUG(
469+
"checkContextTransferStatus: Local frequency map built with %zu unique request IDs", frequencyMap.size());
399470
}
400471
std::vector<std::pair<LlmRequest::RequestIdType, int>> freqVec(frequencyMap.begin(), frequencyMap.end());
401472

402473
std::sort(freqVec.begin(), freqVec.end(),
403474
[](std::pair<LlmRequest::RequestIdType, int> const& left,
404475
std::pair<LlmRequest::RequestIdType, int> const& right) { return left.second > right.second; });
476+
477+
TLLM_LOG_DEBUG("checkContextTransferStatus: Sorted frequency vector, processing %zu entries", freqVec.size());
478+
405479
std::unordered_set<LlmRequest::RequestIdType> toCompleteIdSet;
480+
int expectedFreq = (syncComm) ? syncComm->getSize() : 1;
481+
TLLM_LOG_DEBUG("checkContextTransferStatus: Expected frequency for completion: %d", expectedFreq);
482+
406483
for (auto&& [requestId, freq] : freqVec)
407484
{
408-
if (freq == ((syncComm) ? syncComm->getSize() : 1))
485+
TLLM_LOG_DEBUG(
486+
"checkContextTransferStatus: Request %ld has frequency %d (expected %d)", requestId, freq, expectedFreq);
487+
if (freq == expectedFreq)
409488
{
410489
toCompleteIdSet.insert(requestId);
490+
TLLM_LOG_DEBUG("checkContextTransferStatus: Request %ld added to completion set (freq match)", requestId);
411491
}
412492
}
413493

494+
TLLM_LOG_DEBUG("checkContextTransferStatus: toCompleteIdSet.size()=%zu, atLeastRequestNum=%d",
495+
toCompleteIdSet.size(), atLeastRequestNum.value_or(0));
496+
414497
// Make sure there are at least atLeastRequestNum requests in toCompleteIdSet.
415498
// This will preserve the order of insertion for KVCache transfer requests.
416499
for (auto it = mResponderFutures.begin();
417500
atLeastRequestNum.value_or(0) > static_cast<int>(toCompleteIdSet.size()) && it != mResponderFutures.end();
418501
++it)
419502
{
420503
auto& [request, future] = *it;
504+
TLLM_LOG_DEBUG(
505+
"checkContextTransferStatus: Adding request %ld to completion set (min requirement)", request->mRequestId);
421506
toCompleteIdSet.insert(request->mRequestId);
422507
}
423508

509+
TLLM_LOG_DEBUG("checkContextTransferStatus: Final toCompleteIdSet.size()=%zu", toCompleteIdSet.size());
510+
424511
// Complete all the requests in toCompleteIdSet
512+
TLLM_LOG_DEBUG("checkContextTransferStatus: Starting completion phase, blockAll=%s", blockAll ? "true" : "false");
513+
514+
size_t completedCount = 0;
425515
for (auto it = mResponderFutures.begin(); it != mResponderFutures.end();)
426516
{
427517
auto& [request, future] = *it;
428-
if (blockAll || (toCompleteIdSet.find(request->mRequestId) != toCompleteIdSet.end()))
518+
bool shouldComplete = blockAll || (toCompleteIdSet.find(request->mRequestId) != toCompleteIdSet.end());
519+
520+
TLLM_LOG_DEBUG("checkContextTransferStatus: Request %ld shouldComplete=%s", request->mRequestId,
521+
shouldComplete ? "true" : "false");
522+
523+
if (shouldComplete)
429524
{
525+
TLLM_LOG_DEBUG("checkContextTransferStatus: Blocking on future.get() for request %ld", request->mRequestId);
430526
future.get();
527+
TLLM_LOG_DEBUG("checkContextTransferStatus: future.get() completed for request %ld", request->mRequestId);
528+
431529
request->setState(LlmRequestState::kDISAGG_CONTEXT_COMPLETE);
432530
it = mResponderFutures.erase(it);
531+
completedCount++;
532+
533+
TLLM_LOG_DEBUG(
534+
"checkContextTransferStatus: Request %ld completed and removed from futures", request->mRequestId);
433535
}
434536
else
435537
{
436538
++it;
437539
}
438540
}
541+
542+
TLLM_LOG_DEBUG("checkContextTransferStatus: Exit - completed %zu requests, remaining futures: %zu", completedCount,
543+
mResponderFutures.size());
439544
}
440545

441546
void CacheTransceiver::checkGenTransferStatus(std::optional<int> const& atLeastRequestNum)

cpp/tensorrt_llm/executor/cacheTransceiverConfig.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ namespace tensorrt_llm::executor
2222
{
2323

2424
CacheTransceiverConfig::CacheTransceiverConfig(
25-
std::optional<BackendType> backendType, std::optional<size_t> maxNumTokens)
25+
std::optional<BackendType> backendType, std::optional<size_t> maxNumTokens, std::optional<int> kvTransferTimeoutMs)
2626
: mBackendType(backendType)
2727
, mMaxTokensInBuffer(maxNumTokens)
28+
, mKvTransferTimeoutMs(kvTransferTimeoutMs)
2829
{
2930
}
3031

3132
bool CacheTransceiverConfig::operator==(CacheTransceiverConfig const& other) const
3233
{
33-
return mMaxTokensInBuffer == other.mMaxTokensInBuffer && mBackendType == other.mBackendType;
34+
return mMaxTokensInBuffer == other.mMaxTokensInBuffer && mBackendType == other.mBackendType
35+
&& mKvTransferTimeoutMs == other.mKvTransferTimeoutMs;
3436
}
3537

3638
void CacheTransceiverConfig::setBackendType(std::optional<BackendType> backendType)
@@ -53,4 +55,14 @@ std::optional<size_t> CacheTransceiverConfig::getMaxTokensInBuffer() const
5355
return mMaxTokensInBuffer;
5456
}
5557

58+
void CacheTransceiverConfig::setKvTransferTimeoutMs(std::optional<int> kvTransferTimeoutMs)
59+
{
60+
mKvTransferTimeoutMs = kvTransferTimeoutMs;
61+
}
62+
63+
std::optional<int> CacheTransceiverConfig::getKvTransferTimeoutMs() const
64+
{
65+
return mKvTransferTimeoutMs;
66+
}
67+
5668
} // namespace tensorrt_llm::executor

cpp/tensorrt_llm/nanobind/executor/executorConfig.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -433,15 +433,15 @@ void initConfigBindings(nb::module_& m)
433433
.def("__setstate__", guidedDecodingConfigSetstate);
434434

435435
auto cacheTransceiverConfigGetstate = [](tle::CacheTransceiverConfig const& self)
436-
{ return nb::make_tuple(self.getBackendType(), self.getMaxTokensInBuffer()); };
436+
{ return nb::make_tuple(self.getBackendType(), self.getMaxTokensInBuffer(), self.getKvTransferTimeoutMs()); };
437437
auto cacheTransceiverConfigSetstate = [](tle::CacheTransceiverConfig& self, nb::tuple const& state)
438438
{
439-
if (state.size() != 2)
439+
if (state.size() != 3)
440440
{
441441
throw std::runtime_error("Invalid CacheTransceiverConfig state!");
442442
}
443-
new (&self) tle::CacheTransceiverConfig(
444-
nb::cast<tle::CacheTransceiverConfig::BackendType>(state[0]), nb::cast<std::optional<size_t>>(state[1]));
443+
new (&self) tle::CacheTransceiverConfig(nb::cast<tle::CacheTransceiverConfig::BackendType>(state[0]),
444+
nb::cast<std::optional<size_t>>(state[1]), nb::cast<std::optional<int>>(state[2]));
445445
};
446446

447447
nb::enum_<tle::CacheTransceiverConfig::BackendType>(m, "CacheTransceiverBackendType")
@@ -464,12 +464,16 @@ void initConfigBindings(nb::module_& m)
464464
});
465465

466466
nb::class_<tle::CacheTransceiverConfig>(m, "CacheTransceiverConfig")
467-
.def(nb::init<std::optional<tle::CacheTransceiverConfig::BackendType>, std::optional<size_t>>(),
468-
nb::arg("backend") = std::nullopt, nb::arg("max_tokens_in_buffer") = std::nullopt)
467+
.def(nb::init<std::optional<tle::CacheTransceiverConfig::BackendType>, std::optional<size_t>,
468+
std::optional<int>>(),
469+
nb::arg("backend") = std::nullopt, nb::arg("max_tokens_in_buffer") = std::nullopt,
470+
nb::arg("kv_transfer_timeout_ms") = std::nullopt)
469471
.def_prop_rw(
470472
"backend", &tle::CacheTransceiverConfig::getBackendType, &tle::CacheTransceiverConfig::setBackendType)
471473
.def_prop_rw("max_tokens_in_buffer", &tle::CacheTransceiverConfig::getMaxTokensInBuffer,
472474
&tle::CacheTransceiverConfig::setMaxTokensInBuffer)
475+
.def_prop_rw("kv_transfer_timeout_ms", &tle::CacheTransceiverConfig::getKvTransferTimeoutMs,
476+
&tle::CacheTransceiverConfig::setKvTransferTimeoutMs)
473477
.def("__getstate__", cacheTransceiverConfigGetstate)
474478
.def("__setstate__", cacheTransceiverConfigSetstate);
475479

cpp/tensorrt_llm/pybind/executor/executorConfig.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,15 +415,15 @@ void initConfigBindings(pybind11::module_& m)
415415
.def(py::pickle(guidedDecodingConfigGetstate, guidedDecodingConfigSetstate));
416416

417417
auto cacheTransceiverConfigGetstate = [](tle::CacheTransceiverConfig const& self)
418-
{ return py::make_tuple(self.getBackendType(), self.getMaxTokensInBuffer()); };
418+
{ return py::make_tuple(self.getBackendType(), self.getMaxTokensInBuffer(), self.getKvTransferTimeoutMs()); };
419419
auto cacheTransceiverConfigSetstate = [](py::tuple const& state)
420420
{
421-
if (state.size() != 2)
421+
if (state.size() != 3)
422422
{
423423
throw std::runtime_error("Invalid CacheTransceiverConfig state!");
424424
}
425-
return tle::CacheTransceiverConfig(
426-
state[0].cast<tle::CacheTransceiverConfig::BackendType>(), state[1].cast<std::optional<size_t>>());
425+
return tle::CacheTransceiverConfig(state[0].cast<tle::CacheTransceiverConfig::BackendType>(),
426+
state[1].cast<std::optional<size_t>>(), state[2].cast<std::optional<int>>());
427427
};
428428

429429
py::enum_<tle::CacheTransceiverConfig::BackendType>(m, "CacheTransceiverBackendType")
@@ -446,12 +446,16 @@ void initConfigBindings(pybind11::module_& m)
446446
});
447447

448448
py::class_<tle::CacheTransceiverConfig>(m, "CacheTransceiverConfig")
449-
.def(py::init<std::optional<tle::CacheTransceiverConfig::BackendType>, std::optional<size_t>>(),
450-
py::arg("backend") = std::nullopt, py::arg("max_tokens_in_buffer") = std::nullopt)
449+
.def(py::init<std::optional<tle::CacheTransceiverConfig::BackendType>, std::optional<size_t>,
450+
std::optional<int>>(),
451+
py::arg("backend") = std::nullopt, py::arg("max_tokens_in_buffer") = std::nullopt,
452+
py::arg("kv_transfer_timeout_ms") = std::nullopt)
451453
.def_property(
452454
"backend", &tle::CacheTransceiverConfig::getBackendType, &tle::CacheTransceiverConfig::setBackendType)
453455
.def_property("max_tokens_in_buffer", &tle::CacheTransceiverConfig::getMaxTokensInBuffer,
454456
&tle::CacheTransceiverConfig::setMaxTokensInBuffer)
457+
.def_property("kv_transfer_timeout_ms", &tle::CacheTransceiverConfig::getKvTransferTimeoutMs,
458+
&tle::CacheTransceiverConfig::setKvTransferTimeoutMs)
455459
.def(py::pickle(cacheTransceiverConfigGetstate, cacheTransceiverConfigSetstate));
456460

457461
auto executorConfigGetState = [](py::object const& self)

examples/disaggregated/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ cache_transceiver_config:
1616
backend: <str>
1717
# KV cache buffer size. Set it ≥ the maximum ISL (Input Sequence Length) for best performance.
1818
max_tokens_in_buffer: <int>
19+
# KV cache transfer timeout in milliseconds
20+
# For requests, if they do not send/receive the KV cache in time they are removed and cleaned up.
21+
kv_transfer_timeout_ms: <int>
1922
```
2023
2124
The following is an example, consisting of the `ctx_extra-llm-api-config.yaml` and `gen_extra-llm-api-config.yaml` files needed in the sections below.

0 commit comments

Comments
 (0)