diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..56b824b --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1 @@ +## 2026-04-05 - Concurrent multi-repo search\n**Learning:** Sequential async operations in loops (e.g., `for repo in repos: await search(repo)`) create significant linear latency bottlenecks in retrieval pipelines.\n**Action:** Use `asyncio.gather` combined with list flattening to parallelize these calls, reducing multi-repo search latency to the duration of the single slowest namespace query. diff --git a/src/pipelines/code_retrieval.py b/src/pipelines/code_retrieval.py index 69e4abc..08e4af2 100644 --- a/src/pipelines/code_retrieval.py +++ b/src/pipelines/code_retrieval.py @@ -25,6 +25,7 @@ from __future__ import annotations +import asyncio import logging from typing import Any, Callable, Dict, List, Optional @@ -37,7 +38,6 @@ from src.scanner.code_store import CodeStore from src.schemas.code import ( annotations_namespace, - directories_namespace, files_namespace, snippets_namespace, symbols_namespace, @@ -589,14 +589,19 @@ async def _search_symbols( ) -> List[SourceRecord]: if not repo: logger.warning("search_symbols called without repo — searching all repos") - results = [] - for r in self.repos: - results.extend(await self._search_namespace( + tasks = [ + self._search_namespace( namespace=symbols_namespace(self.org_id, r), query=query, domain="symbol", top_k=top_k, - )) + ) + for r in self.repos + ] + + # Execute concurrently and flatten results + results_list = await asyncio.gather(*tasks) + results = [item for sublist in results_list for item in sublist] return results[:top_k] return await self._search_namespace( @@ -612,14 +617,19 @@ async def _search_files( self, query: str, repo: str, top_k: int = 10, ) -> List[SourceRecord]: if not repo: - results = [] - for r in self.repos: - results.extend(await self._search_namespace( + tasks = [ + self._search_namespace( namespace=files_namespace(self.org_id, r), query=query, domain="file", top_k=top_k, - )) + ) + for r in self.repos + ] + + # Execute concurrently and flatten results + results_list = await asyncio.gather(*tasks) + results = [item for sublist in results_list for item in sublist] return results[:top_k] return await self._search_namespace(