Skip to content

Commit cd53c25

Browse files
norrishuangAkhil-Pathivada
authored andcommitted
feat(aws_opensearch): Add some optimizations when load data into aws opensearch (zilliztech#663)
* 1. Optimized logs output of load data 2. Modified create index dsl of on disk mode. * refine format
1 parent 1f0e299 commit cd53c25

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def _build_vector_field_config(self) -> dict:
117117
"data_type": "float",
118118
"mode": "on_disk",
119119
"compression_level": "32x",
120+
"method": method_config,
120121
}
121122
log.info("Using on-disk vector configuration with compression_level: 32x")
122123
else:
@@ -283,13 +284,19 @@ def insert_chunk(client_idx: int, chunk_idx: int):
283284
other_data[self.label_col_name] = chunk_labels_data[i]
284285
insert_data.append(other_data)
285286

286-
try:
287-
resp = client.bulk(body=insert_data)
288-
log.info(f"Client {client_idx} added {len(resp['items'])} documents")
289-
return len(chunk_embeddings), None
290-
except Exception as e:
291-
log.warning(f"Client {client_idx} failed to insert data: {e!s}")
292-
return 0, e
287+
max_retries = 10
288+
for attempt in range(max_retries):
289+
try:
290+
client.bulk(body=insert_data)
291+
return len(chunk_embeddings), None
292+
except Exception as e:
293+
if "429" in str(e) and attempt < max_retries - 1:
294+
log.warning(f"Client {client_idx} got 429 error, retry {attempt + 1}/{max_retries} after 10s")
295+
time.sleep(10)
296+
else:
297+
log.warning(f"Client {client_idx} failed to insert data: {e!s}")
298+
return 0, e
299+
return 0, Exception("Max retries exceeded")
293300

294301
results = []
295302
with ThreadPoolExecutor(max_workers=len(clients)) as executor:

vectordb_bench/backend/clients/aws_opensearch/config.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,6 @@ def index_param(self) -> dict:
129129
if self.engine == AWSOS_Engine.s3vector:
130130
return {"engine": "s3vector"}
131131

132-
# For on-disk mode, return empty dict as no method config is needed
133-
if self.on_disk:
134-
return {}
135-
136132
parameters = {"ef_construction": self.efConstruction, "m": self.M}
137133

138134
# Add encoder configuration based on quantization type

0 commit comments

Comments
 (0)