Skip to content

Commit f0745b5

Browse files
JoeJRWwangzihao.wzh
andauthored
[Feature] Add Alibaba Cloud RDS MySQL client (#636)
* Alibaba Cloud RDS MySQL introduced vector support in version 20251031 Co-authored-by: wangzihao.wzh <[email protected]>
1 parent 23dd9cf commit f0745b5

File tree

8 files changed

+483
-1
lines changed

8 files changed

+483
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ All the database client supported
6060
| oceanbase | `pip install vectordb-bench[oceanbase]` |
6161
| hologres | `pip install vectordb-bench[hologres]` |
6262
| tencent_es | `pip install vectordb-bench[tencent_es]` |
63+
| alisql | `pip install 'vectordb-bench[alisql]'` |
6364

6465
### Run
6566

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ clickhouse = [ "clickhouse-connect" ]
9999
vespa = [ "pyvespa" ]
100100
lancedb = [ "lancedb" ]
101101
oceanbase = [ "mysql-connector-python" ]
102+
alisql = [ "mysql-connector-python" ]
102103

103104
[project.urls]
104105
"repository" = "https://github.com/zilliztech/VectorDBBench"

vectordb_bench/backend/clients/__init__.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class DB(Enum):
5252
S3Vectors = "S3Vectors"
5353
Hologres = "Alibaba Cloud Hologres"
5454
TencentElasticsearch = "TencentElasticsearch"
55+
AliSQL = "AlibabaCloudRDSMySQL"
5556

5657
@property
5758
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
@@ -206,6 +207,11 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
206207

207208
return TencentElasticsearch
208209

210+
if self == DB.AliSQL:
211+
from .alisql.alisql import AliSQL
212+
213+
return AliSQL
214+
209215
msg = f"Unknown DB: {self.name}"
210216
raise ValueError(msg)
211217

@@ -362,10 +368,15 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915
362368

363369
return TencentElasticsearchConfig
364370

371+
if self == DB.AliSQL:
372+
from .alisql.config import AliSQLConfig
373+
374+
return AliSQLConfig
375+
365376
msg = f"Unknown DB: {self.name}"
366377
raise ValueError(msg)
367378

368-
def case_config_cls( # noqa: C901, PLR0911, PLR0912
379+
def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915
369380
self,
370381
index_type: IndexType | None = None,
371382
) -> type[DBCaseConfig]:
@@ -493,6 +504,11 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912
493504

494505
return TencentElasticsearchIndexConfig
495506

507+
if self == DB.AliSQL:
508+
from .alisql.alisql import AliSQLIndexConfig
509+
510+
return AliSQLIndexConfig
511+
496512
# DB.Pinecone, DB.Chroma, DB.Redis
497513
return EmptyDBCaseConfig
498514

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import logging
2+
from contextlib import contextmanager
3+
4+
import mysql.connector as mysql
5+
import numpy as np
6+
7+
from ..api import VectorDB
8+
from .config import AliSQLConfigDict, AliSQLIndexConfig
9+
10+
log = logging.getLogger(__name__)
11+
12+
13+
class AliSQL(VectorDB):
14+
def __init__(
15+
self,
16+
dim: int,
17+
db_config: AliSQLConfigDict,
18+
db_case_config: AliSQLIndexConfig,
19+
collection_name: str = "vec_collection",
20+
drop_old: bool = False,
21+
**kwargs,
22+
):
23+
self.name = "AliSQL"
24+
self.db_config = db_config
25+
self.case_config = db_case_config
26+
self.db_name = "vectordbbench"
27+
self.table_name = collection_name
28+
self.dim = dim
29+
30+
# construct basic units
31+
self.conn, self.cursor = self._create_connection()
32+
33+
if drop_old:
34+
self._drop_db()
35+
self._create_db_table(dim)
36+
37+
self.cursor.close()
38+
self.conn.close()
39+
self.cursor = None
40+
self.conn = None
41+
42+
def _create_connection(self):
43+
conn = mysql.connect(
44+
host=self.db_config["host"],
45+
user=self.db_config["user"],
46+
port=self.db_config["port"],
47+
password=self.db_config["password"],
48+
buffered=True,
49+
)
50+
cursor = conn.cursor()
51+
52+
assert conn is not None, "Connection is not initialized"
53+
assert cursor is not None, "Cursor is not initialized"
54+
55+
return conn, cursor
56+
57+
def _drop_db(self):
58+
assert self.conn is not None, "Connection is not initialized"
59+
assert self.cursor is not None, "Cursor is not initialized"
60+
log.info(f"{self.name} client drop db : {self.db_name}")
61+
62+
# flush tables before dropping database to avoid some locking issue
63+
self.cursor.execute("FLUSH TABLES")
64+
self.cursor.execute(f"DROP DATABASE IF EXISTS {self.db_name}")
65+
self.cursor.execute("COMMIT")
66+
self.cursor.execute("FLUSH TABLES")
67+
68+
def _create_db_table(self, dim: int):
69+
assert self.conn is not None, "Connection is not initialized"
70+
assert self.cursor is not None, "Cursor is not initialized"
71+
72+
try:
73+
log.info(f"{self.name} client create database : {self.db_name}")
74+
self.cursor.execute(f"CREATE DATABASE {self.db_name}")
75+
76+
log.info(f"{self.name} client create table : {self.table_name}")
77+
self.cursor.execute(f"USE {self.db_name}")
78+
79+
self.cursor.execute(
80+
f"""
81+
CREATE TABLE {self.table_name} (
82+
id INT PRIMARY KEY,
83+
v VECTOR({self.dim}) NOT NULL
84+
)
85+
"""
86+
)
87+
self.cursor.execute("COMMIT")
88+
89+
except Exception as e:
90+
log.warning(f"Failed to create table: {self.table_name} error: {e}")
91+
raise e from None
92+
93+
@contextmanager
94+
def init(self):
95+
"""create and destory connections to database.
96+
97+
Examples:
98+
>>> with self.init():
99+
>>> self.insert_embeddings()
100+
"""
101+
self.conn, self.cursor = self._create_connection()
102+
103+
index_param = self.case_config.index_param()
104+
search_param = self.case_config.search_param()
105+
106+
# maximize allowed package size
107+
self.cursor.execute("SET GLOBAL max_allowed_packet = 1073741824")
108+
109+
if index_param["index_type"] == "HNSW":
110+
if index_param["cache_size"] is not None:
111+
self.cursor.execute(f"SET GLOBAL vidx_hnsw_cache_size = {index_param['cache_size']}")
112+
if search_param["ef_search"] is not None:
113+
self.cursor.execute(f"SET GLOBAL vidx_hnsw_ef_search = {search_param['ef_search']}")
114+
self.cursor.execute("COMMIT")
115+
116+
self.insert_sql = f"INSERT INTO {self.db_name}.{self.table_name} (id, v) VALUES (%s, %s)" # noqa: S608
117+
self.select_sql = (
118+
f"SELECT id FROM {self.db_name}.{self.table_name} " # noqa: S608
119+
f"ORDER by vec_distance_{search_param['metric_type']}(v, %s) LIMIT %s"
120+
)
121+
self.select_sql_with_filter = (
122+
f"SELECT id FROM {self.db_name}.{self.table_name} WHERE id >= %s " # noqa: S608
123+
f"ORDER by vec_distance_{search_param['metric_type']}(v, %s) LIMIT %s"
124+
)
125+
126+
try:
127+
yield
128+
finally:
129+
self.cursor.close()
130+
self.conn.close()
131+
self.cursor = None
132+
self.conn = None
133+
134+
def ready_to_load(self) -> bool:
135+
pass
136+
137+
def optimize(self, data_size: int) -> None:
138+
assert self.conn is not None, "Connection is not initialized"
139+
assert self.cursor is not None, "Cursor is not initialized"
140+
141+
index_param = self.case_config.index_param()
142+
143+
try:
144+
index_options = f"DISTANCE={index_param['metric_type']}"
145+
if index_param["index_type"] == "HNSW" and index_param["M"] is not None:
146+
index_options += f" M={index_param['M']}"
147+
148+
self.cursor.execute(
149+
f"""
150+
ALTER TABLE {self.db_name}.{self.table_name}
151+
ADD VECTOR KEY v(v) {index_options}
152+
"""
153+
)
154+
self.cursor.execute("COMMIT")
155+
156+
except Exception as e:
157+
log.warning(f"Failed to create index: {self.table_name} error: {e}")
158+
raise e from None
159+
160+
@staticmethod
161+
def vector_to_hex(v): # noqa: ANN001
162+
return np.array(v, "float32").tobytes()
163+
164+
def insert_embeddings(
165+
self,
166+
embeddings: list[list[float]],
167+
metadata: list[int],
168+
**kwargs,
169+
) -> tuple[int, Exception]:
170+
"""Insert embeddings into the database.
171+
Should call self.init() first.
172+
"""
173+
assert self.conn is not None, "Connection is not initialized"
174+
assert self.cursor is not None, "Cursor is not initialized"
175+
176+
try:
177+
metadata_arr = np.array(metadata)
178+
embeddings_arr = np.array(embeddings)
179+
180+
batch_data = []
181+
for i, row in enumerate(metadata_arr):
182+
batch_data.append((int(row), self.vector_to_hex(embeddings_arr[i])))
183+
184+
self.cursor.executemany(self.insert_sql, batch_data)
185+
self.cursor.execute("COMMIT")
186+
self.cursor.execute("FLUSH TABLES")
187+
188+
return len(metadata), None
189+
except Exception as e:
190+
log.warning(f"Failed to insert data into Vector table ({self.table_name}), error: {e}")
191+
return 0, e
192+
193+
def search_embedding(
194+
self,
195+
query: list[float],
196+
k: int = 100,
197+
filters: dict | None = None,
198+
timeout: int | None = None,
199+
**kwargs,
200+
) -> list[int]:
201+
assert self.conn is not None, "Connection is not initialized"
202+
assert self.cursor is not None, "Cursor is not initialized"
203+
204+
search_param = self.case_config.search_param() # noqa: F841
205+
206+
try:
207+
if filters:
208+
self.cursor.execute(self.select_sql_with_filter, (filters.get("id"), self.vector_to_hex(query), k))
209+
else:
210+
self.cursor.execute(self.select_sql, (self.vector_to_hex(query), k))
211+
return [row[0] for row in self.cursor.fetchall()]
212+
213+
except mysql.Error:
214+
log.exception("Failed to execute search query")
215+
raise
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from typing import Annotated, Unpack
2+
3+
import click
4+
from pydantic import SecretStr
5+
6+
from vectordb_bench.backend.clients import DB
7+
8+
from ....cli.cli import (
9+
CommonTypedDict,
10+
cli,
11+
click_parameter_decorators_from_typed_dict,
12+
run,
13+
)
14+
15+
16+
class AliSQLTypedDict(CommonTypedDict):
17+
user_name: Annotated[
18+
str,
19+
click.option(
20+
"--username",
21+
type=str,
22+
help="Username",
23+
required=True,
24+
),
25+
]
26+
password: Annotated[
27+
str,
28+
click.option(
29+
"--password",
30+
type=str,
31+
help="Password",
32+
required=True,
33+
),
34+
]
35+
36+
host: Annotated[
37+
str,
38+
click.option(
39+
"--host",
40+
type=str,
41+
help="Db host",
42+
default="127.0.0.1",
43+
),
44+
]
45+
46+
port: Annotated[
47+
int,
48+
click.option(
49+
"--port",
50+
type=int,
51+
default=3306,
52+
help="DB Port",
53+
),
54+
]
55+
56+
57+
class AliSQLHNSWTypedDict(AliSQLTypedDict):
58+
m: Annotated[
59+
int | None,
60+
click.option(
61+
"--m",
62+
type=int,
63+
help="M parameter in HNSW vector indexing",
64+
required=False,
65+
),
66+
]
67+
68+
ef_search: Annotated[
69+
int | None,
70+
click.option(
71+
"--ef-search",
72+
type=int,
73+
help="AliSQL system variable vidx_hnsw_ef_search",
74+
required=False,
75+
),
76+
]
77+
78+
cache_size: Annotated[
79+
int | None,
80+
click.option(
81+
"--cache-size",
82+
type=int,
83+
help="AliSQL system variable vidx_hnsw_cache_size",
84+
required=False,
85+
),
86+
]
87+
88+
89+
@cli.command()
90+
@click_parameter_decorators_from_typed_dict(AliSQLHNSWTypedDict)
91+
def AliSQLHNSW(
92+
**parameters: Unpack[AliSQLHNSWTypedDict],
93+
):
94+
from .config import AliSQLConfig, AliSQLHNSWConfig
95+
96+
run(
97+
db=DB.AliSQL,
98+
db_config=AliSQLConfig(
99+
db_label=parameters["db_label"],
100+
user_name=parameters["username"],
101+
password=SecretStr(parameters["password"]),
102+
host=parameters["host"],
103+
port=parameters["port"],
104+
),
105+
db_case_config=AliSQLHNSWConfig(
106+
M=parameters["m"],
107+
ef_search=parameters["ef_search"],
108+
cache_size=parameters["cache_size"],
109+
),
110+
**parameters,
111+
)

0 commit comments

Comments
 (0)