From 2460bbd4b9e4ea9e444fc1dd38f2b4ea2ce7064d Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 4 Jun 2026 21:23:04 +0800 Subject: [PATCH 1/6] fix(client): buffer search results before connection closes Co-Authored-By: Claude Sonnet 4 --- pyobvector/client/ob_vec_client.py | 61 +++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/pyobvector/client/ob_vec_client.py b/pyobvector/client/ob_vec_client.py index be1f192..ba94eab 100644 --- a/pyobvector/client/ob_vec_client.py +++ b/pyobvector/client/ob_vec_client.py @@ -29,6 +29,55 @@ logger.setLevel(logging.DEBUG) +class _MappingsResult: + def __init__(self, rows: list, keys: list[str]) -> None: + self._data = [dict(zip(keys, row)) for row in rows] + + def fetchall(self) -> list[dict]: + return list(self._data) + + def all(self) -> list[dict]: + return list(self._data) + + def __iter__(self): + return iter(self._data) + + +class _BufferedResult: + """Buffers all rows from a CursorResult before the connection closes. + + Returning conn.execute() directly from inside a ``with engine.connect()`` + block hands the caller a dead cursor once the with-block exits. This + wrapper fetches everything eagerly so the result stays valid. + """ + + def __init__(self, cursor_result) -> None: + self._rows: list = cursor_result.fetchall() + self._keys: list[str] = list(cursor_result.keys()) + + def fetchall(self) -> list: + rows, self._rows = self._rows, [] + return rows + + def fetchone(self): + if not self._rows: + return None + row, self._rows = self._rows[0], self._rows[1:] + return row + + def __iter__(self): + return iter(self._rows) + + def all(self) -> list: + return self.fetchall() + + def keys(self) -> list[str]: + return self._keys + + def mappings(self) -> "_MappingsResult": + return _MappingsResult(self._rows, self._keys) + + class ObVecClient(ObClient): """The OceanBase Vector Client""" @@ -408,11 +457,11 @@ def ann_search( ) if partition_names is None: - return conn.execute(text(stmt_str)) + return _BufferedResult(conn.execute(text(stmt_str))) stmt_str = self._insert_partition_hint_for_query_sql( stmt_str, f"PARTITION({', '.join(partition_names)})" ) - return conn.execute(text(stmt_str)) + return _BufferedResult(conn.execute(text(stmt_str))) def post_ann_search( self, @@ -487,7 +536,7 @@ def post_ann_search( ) ) ) - return conn.execute(stmt) + return _BufferedResult(conn.execute(stmt)) stmt_str = str( stmt.compile( dialect=self.engine.dialect, @@ -499,7 +548,7 @@ def post_ann_search( ) if str_list is not None: str_list.append(stmt_str) - return conn.execute(text(stmt_str)) + return _BufferedResult(conn.execute(text(stmt_str))) def precise_search( self, @@ -537,7 +586,7 @@ def precise_search( stmt = stmt.where(*where_clause) with self.engine.connect() as conn: with conn.begin(): - return conn.execute(stmt) + return _BufferedResult(conn.execute(stmt)) else: stmt = ( select(table) @@ -548,4 +597,4 @@ def precise_search( stmt = stmt.where(*where_clause) with self.engine.connect() as conn: with conn.begin(): - return conn.execute(stmt) + return _BufferedResult(conn.execute(stmt)) From f780bc53cb8d9b438de95c1b7dfba31ef265ef0f Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 4 Jun 2026 21:58:36 +0800 Subject: [PATCH 2/6] fix(seekdb): flush async HNSW index after insert and fix empty-SELECT cursor fix: - Auto-flush seekdb async HNSW index build after insert so ann_search returns results - Set cursor._description=[] (not None) for empty SELECT to prevent ResourceClosedError Co-Authored-By: Claude Sonnet 4 --- pyobvector/client/ob_client.py | 10 ++++++++++ pyobvector/client/ob_vec_client.py | 15 ++++++++------- pyobvector/client/seekdb_engine.py | 17 +++++++++++++++-- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/pyobvector/client/ob_client.py b/pyobvector/client/ob_client.py index 1254b04..bb42438 100644 --- a/pyobvector/client/ob_client.py +++ b/pyobvector/client/ob_client.py @@ -158,6 +158,15 @@ def _is_seekdb(self) -> bool: logger.warning(f"Failed to query version: {e}") return is_seekdb + def _flush_seekdb_index(self) -> None: + """Flush async HNSW index builds in embedded seekdb after insert. + + No-op when not using embedded seekdb or seekdb version < 1.3.0. + """ + server = getattr(self.engine, "_seekdb_server", None) + if server is not None and hasattr(server, "refresh_index"): + server.refresh_index() + def _insert_partition_hint_for_query_sql(self, sql: str, partition_hint: str): from_index = sql.find("FROM") assert from_index != -1 @@ -282,6 +291,7 @@ def insert( .with_hint(f"PARTITION({partition_name})") .values(data) ) + self._flush_seekdb_index() def upsert( self, diff --git a/pyobvector/client/ob_vec_client.py b/pyobvector/client/ob_vec_client.py index ba94eab..bf6caa4 100644 --- a/pyobvector/client/ob_vec_client.py +++ b/pyobvector/client/ob_vec_client.py @@ -1,6 +1,7 @@ """OceanBase Vector Store Client.""" import logging +from collections.abc import Iterator from typing import Any import numpy as np @@ -39,7 +40,7 @@ def fetchall(self) -> list[dict]: def all(self) -> list[dict]: return list(self._data) - def __iter__(self): + def __iter__(self) -> Iterator[dict]: return iter(self._data) @@ -51,24 +52,24 @@ class _BufferedResult: wrapper fetches everything eagerly so the result stays valid. """ - def __init__(self, cursor_result) -> None: - self._rows: list = cursor_result.fetchall() + def __init__(self, cursor_result: Any) -> None: + self._rows: list[tuple] = cursor_result.fetchall() self._keys: list[str] = list(cursor_result.keys()) - def fetchall(self) -> list: + def fetchall(self) -> list[tuple]: rows, self._rows = self._rows, [] return rows - def fetchone(self): + def fetchone(self) -> tuple | None: if not self._rows: return None row, self._rows = self._rows[0], self._rows[1:] return row - def __iter__(self): + def __iter__(self) -> Iterator[tuple]: return iter(self._rows) - def all(self) -> list: + def all(self) -> list[tuple]: return self.fetchall() def keys(self) -> list[str]: diff --git a/pyobvector/client/seekdb_engine.py b/pyobvector/client/seekdb_engine.py index d6188a1..3a31e68 100644 --- a/pyobvector/client/seekdb_engine.py +++ b/pyobvector/client/seekdb_engine.py @@ -12,6 +12,13 @@ from sqlalchemy import create_engine from sqlalchemy.pool import NullPool +_QUERY_SQL_PREFIXES = ("SELECT", "SHOW", "DESCRIBE", "DESC") + + +def _is_query_sql(sql: str) -> bool: + """Return True if sql is a row-returning statement (SELECT/SHOW/DESCRIBE/DESC).""" + return sql.strip().upper().startswith(_QUERY_SQL_PREFIXES) + def _pyformat_to_format(sql: str, params: Any) -> tuple[str, list[Any]]: """Convert SQLAlchemy pyformat (%(name)s) + dict params to %s + list for pyseekdb.""" @@ -50,7 +57,10 @@ def __init__(self, client: Any) -> None: def execute(self, operation: str, parameters: Sequence[Any] | None = None) -> None: result = _execute_via_pyseekdb(self._client, operation, parameters or ()) if not result: - self._description = None + # For SELECT/SHOW/DESCRIBE: use [] so SQLAlchemy treats this as a + # row-returning result with 0 rows, not a non-returning statement. + # _NoResultMetaData (from None) would cause ResourceClosedError on fetchall(). + self._description = [] if _is_query_sql(operation) else None self._rows = [] self.rowcount = 0 return @@ -135,12 +145,15 @@ def create_engine_from_client(pyseekdb_client: Any, **kwargs: Any): def creator() -> _SeekdbConnection: return _SeekdbConnection(server) - return create_engine( + engine = create_engine( "mysql+oceanbase://root:@127.0.0.1:2881/" + database, creator=creator, poolclass=NullPool, **kwargs, ) + # Attach server so callers can invoke server.refresh_index() for HNSW flush. + engine._seekdb_server = server + return engine def create_embedded_engine(path: str, database: str = "test", **kwargs: Any): From 9f8eebed028cd75d0af73f191831770072840e42 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 4 Jun 2026 23:40:11 +0800 Subject: [PATCH 3/6] fix(tests): align PARSER_PROPERTIES assertion with actual DDL output Co-Authored-By: Claude Sonnet 4 --- tests/test_fts_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fts_index.py b/tests/test_fts_index.py index 10bed93..98245f4 100644 --- a/tests/test_fts_index.py +++ b/tests/test_fts_index.py @@ -559,7 +559,7 @@ def test_fts_analyzer_sql_with_properties(self): idx = self._build_index(props) sql = compile_create_fts_index(CreateFtsIndex(idx), _MockCompiler()) self.assertIn("WITH PARSER analyzer", sql) - self.assertIn(f"PARSER_PROPERTIES = ({props})", sql) + self.assertIn(f"PARSER_PROPERTIES=({props})", sql) def test_fts_analyzer_requires_parser_properties(self): param = FtsIndexParam( @@ -587,7 +587,7 @@ def test_fts_parser_properties_not_tied_to_analyzer_type(self): ) sql = compile_create_fts_index(CreateFtsIndex(idx), _MockCompiler()) self.assertIn("WITH PARSER ngram", sql) - self.assertIn("PARSER_PROPERTIES = (token_size = 2)", sql) + self.assertIn("PARSER_PROPERTIES=(token_size = 2)", sql) def test_make_analyzer_properties_default(self): result = make_analyzer_properties() From 223021927d7b99f75637c14f60a3822d72b0de89 Mon Sep 17 00:00:00 2001 From: He Wang Date: Fri, 5 Jun 2026 00:36:07 +0800 Subject: [PATCH 4/6] refactor(client): use public SQLAlchemy APIs and guard refresh errors - Replace engine._seekdb_server with engine.update_execution_options() and engine.get_execution_options() to avoid writing to private Engine attributes (addresses Copilot review comment) - Replace _BufferedResult/_MappingsResult with Result.freeze()() so callers receive a full SQLAlchemy Result with all standard APIs (.first(), .scalars(), .mappings(), etc.) - Wrap server.refresh_index() in try/except so a flush failure after insert is demoted to a warning instead of aborting the write Co-Authored-By: Claude Sonnet 4 --- pyobvector/client/ob_client.py | 7 +++- pyobvector/client/ob_vec_client.py | 62 +++--------------------------- pyobvector/client/seekdb_engine.py | 3 +- 3 files changed, 12 insertions(+), 60 deletions(-) diff --git a/pyobvector/client/ob_client.py b/pyobvector/client/ob_client.py index bb42438..0a296d5 100644 --- a/pyobvector/client/ob_client.py +++ b/pyobvector/client/ob_client.py @@ -163,9 +163,12 @@ def _flush_seekdb_index(self) -> None: No-op when not using embedded seekdb or seekdb version < 1.3.0. """ - server = getattr(self.engine, "_seekdb_server", None) + server = self.engine.get_execution_options().get("seekdb_server") if server is not None and hasattr(server, "refresh_index"): - server.refresh_index() + try: + server.refresh_index() + except Exception as e: + logger.warning("seekdb index refresh failed after insert: %s", e) def _insert_partition_hint_for_query_sql(self, sql: str, partition_hint: str): from_index = sql.find("FROM") diff --git a/pyobvector/client/ob_vec_client.py b/pyobvector/client/ob_vec_client.py index bf6caa4..8deece0 100644 --- a/pyobvector/client/ob_vec_client.py +++ b/pyobvector/client/ob_vec_client.py @@ -1,7 +1,6 @@ """OceanBase Vector Store Client.""" import logging -from collections.abc import Iterator from typing import Any import numpy as np @@ -30,55 +29,6 @@ logger.setLevel(logging.DEBUG) -class _MappingsResult: - def __init__(self, rows: list, keys: list[str]) -> None: - self._data = [dict(zip(keys, row)) for row in rows] - - def fetchall(self) -> list[dict]: - return list(self._data) - - def all(self) -> list[dict]: - return list(self._data) - - def __iter__(self) -> Iterator[dict]: - return iter(self._data) - - -class _BufferedResult: - """Buffers all rows from a CursorResult before the connection closes. - - Returning conn.execute() directly from inside a ``with engine.connect()`` - block hands the caller a dead cursor once the with-block exits. This - wrapper fetches everything eagerly so the result stays valid. - """ - - def __init__(self, cursor_result: Any) -> None: - self._rows: list[tuple] = cursor_result.fetchall() - self._keys: list[str] = list(cursor_result.keys()) - - def fetchall(self) -> list[tuple]: - rows, self._rows = self._rows, [] - return rows - - def fetchone(self) -> tuple | None: - if not self._rows: - return None - row, self._rows = self._rows[0], self._rows[1:] - return row - - def __iter__(self) -> Iterator[tuple]: - return iter(self._rows) - - def all(self) -> list[tuple]: - return self.fetchall() - - def keys(self) -> list[str]: - return self._keys - - def mappings(self) -> "_MappingsResult": - return _MappingsResult(self._rows, self._keys) - - class ObVecClient(ObClient): """The OceanBase Vector Client""" @@ -458,11 +408,11 @@ def ann_search( ) if partition_names is None: - return _BufferedResult(conn.execute(text(stmt_str))) + return conn.execute(text(stmt_str)).freeze()() stmt_str = self._insert_partition_hint_for_query_sql( stmt_str, f"PARTITION({', '.join(partition_names)})" ) - return _BufferedResult(conn.execute(text(stmt_str))) + return conn.execute(text(stmt_str)).freeze()() def post_ann_search( self, @@ -537,7 +487,7 @@ def post_ann_search( ) ) ) - return _BufferedResult(conn.execute(stmt)) + return conn.execute(stmt).freeze()() stmt_str = str( stmt.compile( dialect=self.engine.dialect, @@ -549,7 +499,7 @@ def post_ann_search( ) if str_list is not None: str_list.append(stmt_str) - return _BufferedResult(conn.execute(text(stmt_str))) + return conn.execute(text(stmt_str)).freeze()() def precise_search( self, @@ -587,7 +537,7 @@ def precise_search( stmt = stmt.where(*where_clause) with self.engine.connect() as conn: with conn.begin(): - return _BufferedResult(conn.execute(stmt)) + return conn.execute(stmt).freeze()() else: stmt = ( select(table) @@ -598,4 +548,4 @@ def precise_search( stmt = stmt.where(*where_clause) with self.engine.connect() as conn: with conn.begin(): - return _BufferedResult(conn.execute(stmt)) + return conn.execute(stmt).freeze()() diff --git a/pyobvector/client/seekdb_engine.py b/pyobvector/client/seekdb_engine.py index 3a31e68..a5baaaa 100644 --- a/pyobvector/client/seekdb_engine.py +++ b/pyobvector/client/seekdb_engine.py @@ -151,8 +151,7 @@ def creator() -> _SeekdbConnection: poolclass=NullPool, **kwargs, ) - # Attach server so callers can invoke server.refresh_index() for HNSW flush. - engine._seekdb_server = server + engine.update_execution_options(seekdb_server=server) return engine From ba4ddaa4f054d7d3a20e1932b0f2f86c60f0d135 Mon Sep 17 00:00:00 2001 From: He Wang Date: Fri, 5 Jun 2026 02:11:03 +0800 Subject: [PATCH 5/6] fix(seekdb): preserve column metadata for 0-row SELECT and fix flush docstring - Add _description_from_select() using sqlglot to extract column names when a SELECT returns 0 rows, so cursor.description carries real names instead of an empty list - Add module-level logger to seekdb_engine and log parse failures at DEBUG - Fix _flush_seekdb_index docstring to reflect hasattr gate rather than a version check Co-Authored-By: Claude Sonnet 4 --- pyobvector/client/ob_client.py | 3 ++- pyobvector/client/seekdb_engine.py | 31 +++++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/pyobvector/client/ob_client.py b/pyobvector/client/ob_client.py index 0a296d5..8a0c010 100644 --- a/pyobvector/client/ob_client.py +++ b/pyobvector/client/ob_client.py @@ -161,7 +161,8 @@ def _is_seekdb(self) -> bool: def _flush_seekdb_index(self) -> None: """Flush async HNSW index builds in embedded seekdb after insert. - No-op when not using embedded seekdb or seekdb version < 1.3.0. + No-op when not using embedded seekdb or when the server does not expose + a ``refresh_index`` method. """ server = self.engine.get_execution_options().get("seekdb_server") if server is not None and hasattr(server, "refresh_index"): diff --git a/pyobvector/client/seekdb_engine.py b/pyobvector/client/seekdb_engine.py index a5baaaa..3b0a65d 100644 --- a/pyobvector/client/seekdb_engine.py +++ b/pyobvector/client/seekdb_engine.py @@ -5,6 +5,7 @@ Requires optional dependency: pip install pyobvector[pyseekdb] """ +import logging import re from collections.abc import Mapping, Sequence from typing import Any @@ -12,6 +13,8 @@ from sqlalchemy import create_engine from sqlalchemy.pool import NullPool +logger = logging.getLogger(__name__) + _QUERY_SQL_PREFIXES = ("SELECT", "SHOW", "DESCRIBE", "DESC") @@ -20,6 +23,28 @@ def _is_query_sql(sql: str) -> bool: return sql.strip().upper().startswith(_QUERY_SQL_PREFIXES) +def _description_from_select(sql: str) -> list[tuple]: + """Extract DBAPI description tuples from SELECT column list using sqlglot. + + Falls back to [] when sqlglot cannot parse the statement (e.g. SHOW/DESCRIBE). + """ + try: + import sqlglot + import sqlglot.expressions as exp + + parsed = sqlglot.parse_one(sql) + if parsed is None or not hasattr(parsed, "selects"): + return [] + cols = [] + for sel in parsed.selects: + name = sel.alias or (sel.name if isinstance(sel, exp.Column) else sel.sql()) + cols.append(name) + return [(c, None, None, None, None, None, None) for c in cols] + except Exception as e: + logger.debug("_description_from_select could not parse SQL: %s", e) + return [] + + def _pyformat_to_format(sql: str, params: Any) -> tuple[str, list[Any]]: """Convert SQLAlchemy pyformat (%(name)s) + dict params to %s + list for pyseekdb.""" if not isinstance(params, Mapping): @@ -57,10 +82,10 @@ def __init__(self, client: Any) -> None: def execute(self, operation: str, parameters: Sequence[Any] | None = None) -> None: result = _execute_via_pyseekdb(self._client, operation, parameters or ()) if not result: - # For SELECT/SHOW/DESCRIBE: use [] so SQLAlchemy treats this as a - # row-returning result with 0 rows, not a non-returning statement. + # For SELECT/SHOW/DESCRIBE: populate description so SQLAlchemy treats this + # as a row-returning result with 0 rows rather than a non-returning statement. # _NoResultMetaData (from None) would cause ResourceClosedError on fetchall(). - self._description = [] if _is_query_sql(operation) else None + self._description = _description_from_select(operation) if _is_query_sql(operation) else None self._rows = [] self.rowcount = 0 return From 91f2cac06d9c4dafcd87597483afd08fe9bb5de5 Mon Sep 17 00:00:00 2001 From: He Wang Date: Fri, 5 Jun 2026 02:14:09 +0800 Subject: [PATCH 6/6] style: wrap long ternary in seekdb_engine to pass ruff format Co-Authored-By: Claude Sonnet 4 --- pyobvector/client/seekdb_engine.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyobvector/client/seekdb_engine.py b/pyobvector/client/seekdb_engine.py index 3b0a65d..aa82a92 100644 --- a/pyobvector/client/seekdb_engine.py +++ b/pyobvector/client/seekdb_engine.py @@ -85,7 +85,11 @@ def execute(self, operation: str, parameters: Sequence[Any] | None = None) -> No # For SELECT/SHOW/DESCRIBE: populate description so SQLAlchemy treats this # as a row-returning result with 0 rows rather than a non-returning statement. # _NoResultMetaData (from None) would cause ResourceClosedError on fetchall(). - self._description = _description_from_select(operation) if _is_query_sql(operation) else None + self._description = ( + _description_from_select(operation) + if _is_query_sql(operation) + else None + ) self._rows = [] self.rowcount = 0 return