From 5e12ac5336df58d12419fb3570fab19c6fc71050 Mon Sep 17 00:00:00 2001 From: ishaanxgupta <124028055+ishaanxgupta@users.noreply.github.com> Date: Tue, 7 Apr 2026 16:40:58 +0000 Subject: [PATCH 1/2] perf(weaver): execute batched vector ops concurrently - Refactored `Weaver._execute_batched_vector` to offload synchronous embedding generation (`self.embed_fn`) and vector store I/O (`store.add`, `store.delete`) to an executor thread pool (`loop.run_in_executor`). - Replaced sequential processing in `flush_add_batch` with `asyncio.gather(*tasks, return_exceptions=True)` to execute all vector operations for a given batch concurrently. - Re-architected error handling around metadata extraction and embedding execution to ensure robust failures are localized to single operations instead of failing the entire batch or skipping subsequently enqueued `DELETE` operations. --- src/pipelines/weaver.py | 73 +++++++++++++++++++++++++++++------------ verify_weaver_batch.py | 62 ++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 21 deletions(-) create mode 100644 verify_weaver_batch.py diff --git a/src/pipelines/weaver.py b/src/pipelines/weaver.py index 8138436..55d55cb 100644 --- a/src/pipelines/weaver.py +++ b/src/pipelines/weaver.py @@ -12,6 +12,8 @@ from __future__ import annotations +import asyncio +from functools import partial import logging from typing import Any, Callable, Dict, List, Optional @@ -126,7 +128,6 @@ async def flush_add_batch(): # Prepare data for batch add valid_ops = [] texts = [] - embeddings = [] metadatas = [] for op in add_batch_ops: @@ -139,41 +140,70 @@ async def flush_add_batch(): continue try: - emb = self.embed_fn(op.content) meta = {"user_id": user_id, "domain": domain.value} meta.update(_extract_structured_metadata(op.content)) valid_ops.append(op) texts.append(op.content) - embeddings.append(emb) metadatas.append(meta) except Exception as exc: - logger.error("Embedding generation failed for ADD: %s", exc) + logger.error("Metadata extraction failed for ADD: %s", exc) executed_ops.append(ExecutedOp( type=op.type, status=OpStatus.FAILED, content=op.content, error=str(exc) )) if valid_ops: - try: - ids = self.vector_store.add( - texts=texts, - embeddings=embeddings, - metadata=metadatas, - ) - # Map IDs back to ops - for op, new_id in zip(valid_ops, ids): - executed_ops.append(ExecutedOp( - type=op.type, status=OpStatus.SUCCESS, - content=op.content, new_id=new_id, - )) - except Exception as exc: - logger.error("Vector batch ADD failed: %s", exc) - for op in valid_ops: + loop = asyncio.get_running_loop() + + async def _embed(text: str) -> List[float]: + return await loop.run_in_executor(None, self.embed_fn, text) + + tasks = [_embed(text) for text in texts] + results = await asyncio.gather(*tasks, return_exceptions=True) + + successful_ops = [] + successful_texts = [] + successful_embeddings = [] + successful_metadatas = [] + + for op, text, meta, res in zip(valid_ops, texts, metadatas, results): + if isinstance(res, Exception): + logger.error("Embedding generation failed for ADD: %s", res) executed_ops.append(ExecutedOp( type=op.type, status=OpStatus.FAILED, - content=op.content, error=str(exc) + content=op.content, error=str(res) )) + else: + successful_ops.append(op) + successful_texts.append(text) + successful_embeddings.append(res) + successful_metadatas.append(meta) + + if successful_ops: + try: + ids = await loop.run_in_executor( + None, + partial( + self.vector_store.add, + texts=successful_texts, + embeddings=successful_embeddings, + metadata=successful_metadatas, + ) + ) + # Map IDs back to ops + for op, new_id in zip(successful_ops, ids): + executed_ops.append(ExecutedOp( + type=op.type, status=OpStatus.SUCCESS, + content=op.content, new_id=new_id, + )) + except Exception as exc: + logger.error("Vector batch ADD failed: %s", exc) + for op in successful_ops: + executed_ops.append(ExecutedOp( + type=op.type, status=OpStatus.FAILED, + content=op.content, error=str(exc) + )) add_batch_ops.clear() @@ -197,8 +227,9 @@ async def flush_delete_batch(): ids_to_delete.append(op.embedding_id) if valid_ops: + loop = asyncio.get_running_loop() try: - success = self.vector_store.delete(ids=ids_to_delete) + success = await loop.run_in_executor(None, partial(self.vector_store.delete, ids=ids_to_delete)) status = OpStatus.SUCCESS if success else OpStatus.FAILED for op in valid_ops: executed_ops.append(ExecutedOp( diff --git a/verify_weaver_batch.py b/verify_weaver_batch.py new file mode 100644 index 0000000..628a63b --- /dev/null +++ b/verify_weaver_batch.py @@ -0,0 +1,62 @@ +import asyncio +import time +from unittest.mock import MagicMock +from src.pipelines.weaver import Weaver, _extract_structured_metadata +from src.schemas.judge import JudgeResult, JudgeDomain, Operation, OperationType + +class DummyVectorStore: + def add(self, texts, embeddings, metadata): + time.sleep(0.5) # Simulate network IO + return [f"id_{i}" for i in range(len(texts))] + + def delete(self, ids): + time.sleep(0.5) # Simulate network IO + return True + +def slow_embed(text: str): + time.sleep(0.2) # Simulate API call delay + if "FAIL" in text: + raise Exception("Simulated embedding failure") + return [0.1, 0.2, 0.3] + + +async def main(): + store = DummyVectorStore() + weaver = Weaver(vector_store=store, embed_fn=slow_embed) + + # 5 ADD operations (1 fails) and 2 DELETE operations + operations = [ + Operation(type=OperationType.ADD, content="Valid doc 1"), + Operation(type=OperationType.ADD, content="Valid doc 2"), + Operation(type=OperationType.ADD, content="FAIL doc"), + Operation(type=OperationType.ADD, content="Valid doc 3"), + Operation(type=OperationType.ADD, content="Valid doc 4"), + Operation(type=OperationType.DELETE, embedding_id="id_to_delete_1"), + Operation(type=OperationType.DELETE, embedding_id="id_to_delete_2"), + ] + + judge_result = JudgeResult( + confidence=0.9, + operations=operations + ) + + start_time = time.time() + res = await weaver.execute(judge_result, JudgeDomain.PROFILE, "user_123") + end_time = time.time() + + print(f"Total time taken: {end_time - start_time:.2f} seconds") + print(f"Total operations expected: 7") + print(f"Operations executed: {res.total}") + print(f"Succeeded: {res.succeeded}") + print(f"Failed: {res.failed}") + + # We expect 5 adds + 2 deletes + # 1 fail doc should fail. + # With concurrency: + # 5 embeds run in parallel (max ~0.2s) + # 1 store add runs (~0.5s) + # 1 store delete runs (~0.5s) + # Total time should be roughly ~1.2s instead of > 2s (sequential: 5*0.2 + 0.5 + 0.5 = 2.0) + +if __name__ == "__main__": + asyncio.run(main()) From 4e822403cf03a0dfa973465f4cbacb7c4405a99d Mon Sep 17 00:00:00 2001 From: Ishaan Gupta Date: Mon, 13 Apr 2026 09:59:17 +0530 Subject: [PATCH 2/2] Delete verify_weaver_batch.py --- verify_weaver_batch.py | 62 ------------------------------------------ 1 file changed, 62 deletions(-) delete mode 100644 verify_weaver_batch.py diff --git a/verify_weaver_batch.py b/verify_weaver_batch.py deleted file mode 100644 index 628a63b..0000000 --- a/verify_weaver_batch.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import time -from unittest.mock import MagicMock -from src.pipelines.weaver import Weaver, _extract_structured_metadata -from src.schemas.judge import JudgeResult, JudgeDomain, Operation, OperationType - -class DummyVectorStore: - def add(self, texts, embeddings, metadata): - time.sleep(0.5) # Simulate network IO - return [f"id_{i}" for i in range(len(texts))] - - def delete(self, ids): - time.sleep(0.5) # Simulate network IO - return True - -def slow_embed(text: str): - time.sleep(0.2) # Simulate API call delay - if "FAIL" in text: - raise Exception("Simulated embedding failure") - return [0.1, 0.2, 0.3] - - -async def main(): - store = DummyVectorStore() - weaver = Weaver(vector_store=store, embed_fn=slow_embed) - - # 5 ADD operations (1 fails) and 2 DELETE operations - operations = [ - Operation(type=OperationType.ADD, content="Valid doc 1"), - Operation(type=OperationType.ADD, content="Valid doc 2"), - Operation(type=OperationType.ADD, content="FAIL doc"), - Operation(type=OperationType.ADD, content="Valid doc 3"), - Operation(type=OperationType.ADD, content="Valid doc 4"), - Operation(type=OperationType.DELETE, embedding_id="id_to_delete_1"), - Operation(type=OperationType.DELETE, embedding_id="id_to_delete_2"), - ] - - judge_result = JudgeResult( - confidence=0.9, - operations=operations - ) - - start_time = time.time() - res = await weaver.execute(judge_result, JudgeDomain.PROFILE, "user_123") - end_time = time.time() - - print(f"Total time taken: {end_time - start_time:.2f} seconds") - print(f"Total operations expected: 7") - print(f"Operations executed: {res.total}") - print(f"Succeeded: {res.succeeded}") - print(f"Failed: {res.failed}") - - # We expect 5 adds + 2 deletes - # 1 fail doc should fail. - # With concurrency: - # 5 embeds run in parallel (max ~0.2s) - # 1 store add runs (~0.5s) - # 1 store delete runs (~0.5s) - # Total time should be roughly ~1.2s instead of > 2s (sequential: 5*0.2 + 0.5 + 0.5 = 2.0) - -if __name__ == "__main__": - asyncio.run(main())