-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[ENH]: Load HNSW index without disk intermediary #5159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -207,20 +207,27 @@ impl HnswIndexProvider { | |
|
||
let index_config = IndexConfig::new(dimensionality, distance_function); | ||
|
||
let storage_path_str = match new_storage_path.to_str() { | ||
Some(storage_path_str) => storage_path_str, | ||
None => { | ||
return Err(Box::new(HnswIndexProviderForkError::PathToStringError( | ||
new_storage_path, | ||
))); | ||
} | ||
}; | ||
// let storage_path_str = match new_storage_path.to_str() { | ||
// Some(storage_path_str) => storage_path_str, | ||
// None => { | ||
// return Err(Box::new(HnswIndexProviderForkError::PathToStringError( | ||
// new_storage_path, | ||
// ))); | ||
// } | ||
// }; | ||
|
||
// Check if the entry is in the cache, if it is, we assume | ||
// another thread has loaded the index and we return it. | ||
match self.get(&new_id, cache_key).await { | ||
Some(index) => Ok(index.clone()), | ||
None => match HnswIndex::load(storage_path_str, &index_config, ef_search, new_id) { | ||
None => match HnswIndex::load_from_hnsw_data( | ||
self.fetch_hnsw_segment(&new_id, prefix_path) | ||
.await | ||
.map_err(|e| Box::new(HnswIndexProviderForkError::FileError(*e)))?, | ||
&index_config, | ||
ef_search, | ||
new_id, | ||
) { | ||
Ok(index) => { | ||
let index = HnswIndexRef { | ||
inner: Arc::new(RwLock::new(DistributedHnswInner { | ||
|
@@ -277,10 +284,33 @@ impl HnswIndexProvider { | |
prefix_path: &str, | ||
) -> Result<(), Box<HnswIndexProviderFileError>> { | ||
// Fetch the files from storage and put them in the index storage path. | ||
let hnsw_data = self.fetch_hnsw_segment(source_id, prefix_path).await?; | ||
let getters = [ | ||
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.header_buffer())), | ||
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.data_level0_buffer())), | ||
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.length_buffer())), | ||
|hnsw_data: &hnswlib::HnswData| Arc::new(Vec::from(hnsw_data.link_list_buffer())), | ||
]; | ||
|
||
for (file, getter) in FILES.iter().zip(getters) { | ||
let file_path = index_storage_path.join(file); | ||
self.copy_bytes_to_local_file(&file_path, getter(&hnsw_data)) | ||
.await?; | ||
} | ||
Ok(()) | ||
} | ||
Comment on lines
+287
to
+301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [PerformanceOptimization] This function now fetches the entire HNSW segment into an in-memory |
||
|
||
async fn fetch_hnsw_segment( | ||
&self, | ||
source_id: &IndexUuid, | ||
prefix_path: &str, | ||
) -> Result<hnswlib::HnswData, Box<HnswIndexProviderFileError>> { | ||
let mut buffers = Vec::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than this pattern of assuming the buffers are in order, can we expose a HnswDataBuilder that will allow us to .add_<named_buffer>() and then .build() returns the HnswData. Less bug prone under changes |
||
|
||
for file in FILES.iter() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we refactor this to get the files in parallel? Probably a separate PR but seems worth doing while we are in here/ |
||
let s3_fetch_span = | ||
tracing::trace_span!(parent: Span::current(), "Read bytes from s3", file = file); | ||
let buf = s3_fetch_span | ||
let _ = s3_fetch_span | ||
.in_scope(|| async { | ||
let key = Self::format_key(prefix_path, source_id, file); | ||
tracing::info!("Loading hnsw index file: {} into directory", key); | ||
|
@@ -304,13 +334,24 @@ impl HnswIndexProvider { | |
bytes_read, | ||
key, | ||
); | ||
Ok(buf) | ||
buffers.push(buf); | ||
Ok(()) | ||
}) | ||
.await?; | ||
let file_path = index_storage_path.join(file); | ||
self.copy_bytes_to_local_file(&file_path, buf).await?; | ||
} | ||
Ok(()) | ||
match hnswlib::HnswData::new_from_buffers( | ||
buffers[0].clone(), | ||
HammadB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
buffers[1].clone(), | ||
buffers[2].clone(), | ||
buffers[3].clone(), | ||
) { | ||
Ok(hnsw_data) => Ok(hnsw_data), | ||
Err(e) => Err(Box::new(HnswIndexProviderFileError::StorageError( | ||
chroma_storage::StorageError::Message { | ||
message: e.to_string(), | ||
}, | ||
))), | ||
} | ||
} | ||
|
||
pub async fn open( | ||
|
@@ -356,20 +397,27 @@ impl HnswIndexProvider { | |
|
||
let index_config = IndexConfig::new(dimensionality, distance_function); | ||
|
||
let index_storage_path_str = match index_storage_path.to_str() { | ||
Some(index_storage_path_str) => index_storage_path_str, | ||
None => { | ||
return Err(Box::new(HnswIndexProviderOpenError::PathToStringError( | ||
index_storage_path, | ||
))); | ||
} | ||
}; | ||
// let index_storage_path_str = match index_storage_path.to_str() { | ||
// Some(index_storage_path_str) => index_storage_path_str, | ||
// None => { | ||
// return Err(Box::new(HnswIndexProviderOpenError::PathToStringError( | ||
// index_storage_path, | ||
// ))); | ||
// } | ||
// }; | ||
|
||
// Check if the entry is in the cache, if it is, we assume | ||
// another thread has loaded the index and we return it. | ||
let index = match self.get(id, cache_key).await { | ||
Some(index) => Ok(index.clone()), | ||
None => match HnswIndex::load(index_storage_path_str, &index_config, ef_search, *id) { | ||
None => match HnswIndex::load_from_hnsw_data( | ||
self.fetch_hnsw_segment(id, prefix_path) | ||
.await | ||
.map_err(|e| Box::new(HnswIndexProviderOpenError::FileError(*e)))?, | ||
&index_config, | ||
ef_search, | ||
*id, | ||
) { | ||
Comment on lines
+413
to
+420
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [PerformanceOptimization] This change successfully loads the index from memory, which aligns with the PR's goal. However, the To fully load without a disk intermediary and improve efficiency, you could remove the calls to |
||
Ok(index) => { | ||
let index = HnswIndexRef { | ||
inner: Arc::new(RwLock::new(DistributedHnswInner { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The logic for loading the index within the
fork
method appears to be incorrect. It attempts to fetch the segment from remote storage usingnew_id
, but the index fornew_id
doesn't exist in storage yet. The files forsource_id
have just been copied to a local directory.The previous implementation using
HnswIndex::load(storage_path_str, ...)
correctly loaded the index from this new local directory. Sincefork
is intended to create a mutable, file-backed copy of an index, it seems the original approach of loading from the local path should be restored.