@@ -23,21 +23,28 @@ class _PagePayload:
2323class DiskCacheWorker :
2424 """Background worker that offloads CPU KV pages to disk using kvcache."""
2525
26- def __init__ (self , disk_cache_storage_size : float , cpu_cache_client ):
26+ def __init__ (
27+ self ,
28+ disk_cache_storage_size : float ,
29+ cpu_cache_client ,
30+ disk_cache_dir : Optional [str ] = None ,
31+ ):
2732 self .cpu_cache_client = cpu_cache_client
2833 self ._pages_all_idle = False
2934
3035 assert disk_cache_storage_size > 0
3136 storage_size = int (disk_cache_storage_size * (1024 ** 3 ))
32- num_shard = 32
33- num_worker = 32
37+ num_shard = 64
38+ num_worker = 48
39+ max_concurrent_write_tasks = 16
3440
35- cache_dir = os . getenv ( "LIGHTLLM_DISK_CACHE_DIR" )
41+ cache_dir = disk_cache_dir
3642 if not cache_dir :
3743 cache_dir = os .path .join (tempfile .gettempdir (), f"lightllm_disk_cache_{ get_unique_server_name ()} " )
3844 os .makedirs (cache_dir , exist_ok = True )
3945 cache_file = os .path .join (cache_dir , "cache_file" )
4046
47+ self .max_concurrent_write_tasks = max_concurrent_write_tasks
4148 self ._page_major_tensor = self ._prepare_tensor (cpu_cache_client .cpu_kv_cache_tensor )
4249
4350 self .service = PyLocalCacheService (
@@ -49,7 +56,7 @@ def __init__(self, disk_cache_storage_size: float, cpu_cache_client):
4956 )
5057
5158 logger .info (
52- "blueswhen disk cache worker initialized: dir=%s size_bytes=%d shards=%d workers=%d pages_per_block=%d" ,
59+ "disk cache worker initialized: dir=%s size_bytes=%d shards=%d workers=%d pages_per_block=%d" ,
5360 cache_dir ,
5461 storage_size ,
5562 num_shard ,
@@ -63,35 +70,15 @@ def _prepare_tensor(self, tensor: torch.Tensor) -> torch.Tensor:
6370
6471 def run (self ) -> None :
6572 while True :
66- time .sleep (0.01 )
73+ time .sleep (0.1 )
6774 payload_groups = self ._gather_offload_payloads ()
68- # self._log_idle_once()
6975 if not payload_groups :
7076 continue
7177 for payloads in payload_groups :
7278 if not payloads :
7379 continue
7480 self ._persist_pages_to_disk (payloads )
7581
76- def _log_idle_once (self ) -> int :
77- locked_pages = 0
78- self .cpu_cache_client .lock .acquire_sleep1ms ()
79- try :
80- for page_idx in range (self .cpu_cache_client .page_num ):
81- page_item = self .cpu_cache_client .page_items .get_item_by_index (page_idx )
82- if not page_item .is_ready_recycle () or page_item .ref_count != 0 :
83- locked_pages += 1
84- finally :
85- self .cpu_cache_client .lock .release ()
86-
87- if locked_pages == 0 :
88- if not self ._pages_all_idle :
89- logger .info ("blueswhen all cpu cache pages are idle and ready to reuse" )
90- self ._pages_all_idle = True
91- else :
92- self ._pages_all_idle = False
93- return locked_pages
94-
9582 def _gather_offload_payloads (self ) -> List [List [_PagePayload ]]:
9683 self .cpu_cache_client .lock .acquire_sleep1ms ()
9784 try :
@@ -120,16 +107,21 @@ def _persist_pages_to_disk(self, payloads: List[_PagePayload]) -> None:
120107 kv_indexer = torch .tensor (page_indexes , dtype = torch .int32 , device = "cpu" )
121108 query_result = self .service .query (tokens )
122109 if not all (query_result ):
110+ # 限制写入并发量,给读取操作留资源
111+ while (
112+ self .service .active_threads ("r" ) and self .service .active_threads ("w" ) >= self .max_concurrent_write_tasks
113+ ):
114+ time .sleep (0.001 )
115+
123116 task = self .service .create (tokens = tokens , kv_page_indexer = kv_indexer , mode = "w" )
124- while not task .ready ():
117+ # 数据安全即可结束等待,无需写入完成
118+ while not task .data_safe ():
125119 time .sleep (0.001 )
126120
127121 self .cpu_cache_client .lock .acquire_sleep1ms ()
128122 self .cpu_cache_client .update_pages_status_to_ready_recycle (page_list = page_indexes , deref = True )
129123 self .cpu_cache_client .lock .release ()
130124
131- # self._log_idle_once()
132-
133125 def blocks_exist (self , tokens : List [int ], start_pos : int = 0 ) -> bool :
134126 if not tokens or start_pos < 0 or start_pos >= len (tokens ):
135127 return False
@@ -147,6 +139,11 @@ def load_pages(self, tokens: List[int], page_indexes: List[int], start_pos: int
147139 if start_pos < 0 or start_pos >= len (tokens ):
148140 return False
149141
142+ # 检测当前是否有写操作在进行,若有则跳过本次load请求,暂时不用
143+ # if self.service.active_threads("w") > 0:
144+ # logger.warning("disk cache worker is busy writing, skip load_pages")
145+ # return False
146+
150147 kv_indexer = torch .tensor (page_indexes , dtype = torch .int32 , device = "cpu" )
151148 task = self .service .create (tokens = tokens , kv_page_indexer = kv_indexer , mode = "r" , start_pos = start_pos )
152149 while not task .ready ():
0 commit comments