@@ -37,6 +37,16 @@ def __init__(self, only_create_meta_data: bool, init_shm_data: bool):
3737 self .attach_shm_handle = self ._attach_shm_cpu_kv_cache ()
3838 return
3939
40+ @staticmethod
41+ def _encode_offload_head (page_index : int ) -> int :
42+ return - (page_index + 1 )
43+
44+ @staticmethod
45+ def _decode_offload_value (value : int ) -> Tuple [int , bool ]:
46+ if value < 0 :
47+ return - (value + 1 ), True
48+ return value , False
49+
4050 def get_one_empty_page (self , hash_key : int , disk_offload_enable : bool ) -> Optional [int ]:
4151 assert self .page_hash_dict .get (hash_key ) is None
4252 head = self .page_items .head
@@ -63,15 +73,12 @@ def allocate_one_page(self, hash_key: int, disk_offload_enable: bool) -> Tuple[O
6373 page_index = self .page_hash_dict .get (hash_key )
6474 if page_index is not None :
6575 page_item : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
76+ page_item .ref_count += 1
77+ page_item .del_self_from_list ()
78+ self .page_items .add_item_to_tail (index = page_index )
6679 if page_item .is_data_ready ():
67- page_item .ref_count += 1
68- page_item .del_self_from_list ()
69- self .page_items .add_item_to_tail (index = page_index )
7080 return page_index , True
7181 else :
72- page_item .ref_count += 1
73- page_item .del_self_from_list ()
74- self .page_items .add_item_to_tail (index = page_index )
7582 return page_index , False
7683 else :
7784 page_index = self .get_one_empty_page (hash_key = hash_key , disk_offload_enable = disk_offload_enable )
@@ -101,34 +108,54 @@ def allocate_pages(self, hash_keys: List[int], disk_offload_enable: bool) -> Tup
101108 ready_list .extend ([False for _ in range (left_num )])
102109 return page_list , ready_list
103110
104- def update_pages_status_to_ready (self , page_list : List [int ], deref : bool = True , disk_offload_enable : bool = False ):
111+ def update_pages_status_to_ready (
112+ self ,
113+ page_list : List [int ],
114+ deref : bool = True ,
115+ disk_offload_enable : bool = False ,
116+ ):
117+ offload_candidates : List [int ] = []
105118 for page_index in page_list :
106119 if page_index != - 1 :
107120 cur_page : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
108121 if cur_page .status < cur_page .READY :
109122 cur_page .status = cur_page .READY
110123 if disk_offload_enable :
111- self . offload_page_indexes . add_item ( value = cur_page .self_index )
124+ offload_candidates . append ( cur_page .self_index )
112125 if deref :
113126 assert cur_page .ref_count > 0
114127 cur_page .ref_count -= 1
128+
129+ if disk_offload_enable and offload_candidates :
130+ for idx , page_index in enumerate (offload_candidates ):
131+ if idx == 0 :
132+ encoded = self ._encode_offload_head (page_index )
133+ else :
134+ encoded = page_index
135+ self .offload_page_indexes .add_item (value = encoded )
136+ return
137+
138+ def mark_pages_recyclable (self , page_list : List [int ]):
139+ for page_index in page_list :
140+ if page_index == - 1 :
141+ continue
142+ cur_page : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
143+ if cur_page .status >= cur_page .READY :
144+ cur_page .status = cur_page .READY_RECYCLE
115145 return
116146
117147 def query_one_page (self , hash_key : int ) -> Tuple [Optional [int ], bool ]:
118148 page_index = self .page_hash_dict .get (hash_key )
119149 if page_index is not None :
120150 page_item : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
151+ page_item .ref_count += 1
152+ # lru 更新
153+ page_item .del_self_from_list ()
154+ self .page_items .add_item_to_tail (index = page_index )
121155 if page_item .is_data_ready ():
122- page_item .ref_count += 1
123- # lru 更新
124- page_item .del_self_from_list ()
125- self .page_items .add_item_to_tail (index = page_index )
126156 return page_index , True
127157 else :
128- # lru 更新
129- page_item .del_self_from_list ()
130- self .page_items .add_item_to_tail (index = page_index )
131- return None , False
158+ return page_index , False
132159 else :
133160 return None , False
134161
@@ -138,6 +165,7 @@ def check_allpages_ready(self, page_list: List[int]) -> bool:
138165 continue
139166 page_item : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
140167 if not page_item .is_data_ready ():
168+ logger .info ("cpu cache page %d not ready, status %d" , page_index , page_item .status )
141169 return False
142170 return True
143171
@@ -156,17 +184,30 @@ def deref_one_page(self, page_index: int):
156184 page_item .ref_count -= 1
157185 return
158186
159- def get_pages_to_offloading (self ) -> List [int ]:
187+ def get_pages_to_offloading (self ) -> List [List [ int ] ]:
160188 page_list = self .offload_page_indexes .pop_all_item ()
161- ans_list = []
162- if page_list is not None :
163- for page_index in page_list :
164- page_item : _CpuPageStatus = self .page_items .get_item_by_index (index = page_index )
165- if page_item .is_ready ():
166- page_item .ref_count += 1
167- page_item .status = page_item .OFFLOADING
168- ans_list .append (page_index )
169- return ans_list
189+ groups : List [List [int ]] = []
190+ current_group : List [int ] = []
191+
192+ if page_list is None :
193+ return groups
194+
195+ for value in page_list :
196+ page_index , is_group_head = self ._decode_offload_value (value )
197+ if is_group_head :
198+ if current_group :
199+ groups .append (current_group )
200+ current_group = []
201+ page_item : _CpuPageStatus = self .page_items .get_item_by_index (index = page_index )
202+ if page_item .is_ready ():
203+ page_item .ref_count += 1
204+ page_item .status = page_item .OFFLOADING
205+ current_group .append (page_index )
206+
207+ if current_group :
208+ groups .append (current_group )
209+
210+ return groups
170211
171212 def update_pages_status_to_ready_recycle (self , page_list : List [int ], deref : bool = True ):
172213 for page_index in page_list :
@@ -179,6 +220,22 @@ def update_pages_status_to_ready_recycle(self, page_list: List[int], deref: bool
179220 cur_page .ref_count -= 1
180221 return
181222
223+ def recycle_pages (self , page_list : List [int ]):
224+ for page_index in page_list :
225+ if page_index == - 1 :
226+ continue
227+ cur_page : _CpuPageStatus = self .page_items .get_item_by_index (page_index )
228+ cur_page .del_self_from_list ()
229+ if not cur_page .is_empty () and cur_page .hash_key != 0 :
230+ existing_index = self .page_hash_dict .get (cur_page .hash_key )
231+ if existing_index is not None :
232+ self .page_hash_dict .remove (cur_page .hash_key )
233+ cur_page .hash_key = 0
234+ cur_page .status = cur_page .EMPTY
235+ cur_page .ref_count = 0
236+ self .page_items .add_item_to_tail (cur_page .self_index )
237+ return
238+
182239 def _create_cpu_status_list (self , init_shm_data : bool ):
183240 self .page_items = ShmLinkedList (
184241 name = f"{ get_unique_server_name ()} _cpu_kv_cache_page_items" ,
0 commit comments