Python: fix(mem0): isolate entity retrieval and correct app_id payload#6242
Python: fix(mem0): isolate entity retrieval and correct app_id payload#6242VedantSonani wants to merge 1 commit into
Conversation
…xing app_id parameter (microsoft#6237)
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR updates the Mem0 context provider to retrieve memories by querying entity “partitions” (user/agent) independently and merging results, avoiding strict AND-filter limitations, and aligns application ID usage with an app_id parameter.
Changes:
- Run parallel Mem0 searches for
user_idandagent_id, then merge/deduplicate results before injecting into the session context. - Refactor filter construction into a per-entity search-kwargs builder that supports OSS vs Platform client differences.
- Update memory creation call to pass
app_idinstead ofmetadata.application_id.
| # 1. Query User partition independently | ||
| if self.user_id: | ||
| user_kwargs = self._build_filters(input_text, "user_id", self.user_id) | ||
| search_tasks.append(self.mem0_client.search(**user_kwargs)) | ||
|
|
||
| # 2. Query Agent partition independently | ||
| if self.agent_id: | ||
| agent_kwargs = self._build_filters(input_text, "agent_id", self.agent_id) | ||
| search_tasks.append(self.mem0_client.search(**agent_kwargs)) | ||
|
|
||
| if not search_tasks: | ||
| return |
| """Search Mem0 for relevant memories and add to the session context.""" | ||
| self._validate_filters() | ||
| input_text = "\n".join(msg.text for msg in context.input_messages if msg and msg.text and msg.text.strip()) | ||
| if not input_text.strip(): | ||
| return |
| results = await asyncio.gather(*search_tasks, return_exceptions=True) | ||
|
|
||
| # Merge and deduplicate results | ||
| memories = [] | ||
| seen_memory_ids = set() | ||
|
|
||
| for search_response in results: | ||
| if isinstance(search_response, Exception): | ||
| continue | ||
|
|
| def _build_filters(self, input_text: str, entity_key: str, entity_value: str) -> dict[str, Any]: | ||
| filters: dict[str, Any] = {"query": input_text} | ||
|
|
||
| if isinstance(self.mem0_client, AsyncMemory): | ||
| # AsyncMemory (OSS) expects direct kwargs | ||
| filters[entity_key] = entity_value | ||
| if self.application_id: | ||
| filters["app_id"] = self.application_id | ||
| else: | ||
| # AsyncMemoryClient (Platform) expects a filters dict | ||
| filters["filters"] = {entity_key: entity_value} | ||
| if self.application_id: | ||
| filters["filters"]["app_id"] = self.application_id | ||
|
|
||
| return filters |
|
@microsoft-github-policy-service agree |
moonbox3
left a comment
There was a problem hiding this comment.
Please also have a look at the failing CI/CD items.
| agent_kwargs = self._build_filters(input_text, "agent_id", self.agent_id) | ||
| search_tasks.append(self.mem0_client.search(**agent_kwargs)) | ||
|
|
||
| if not search_tasks: |
There was a problem hiding this comment.
What happens when only application_id is configured? _validate_filters allows that (app-only is a valid config), but search_tasks only gets populated for user_id/agent_id, so we hit this guard and return without ever searching. App-scoped setups now retrieve zero memories, silently, even though after_run keeps writing them. Regression vs the old single-search path that always included app_id.
One option might be an app-only fallback before the guard:
| if not search_tasks: | |
| # Fall back to an app-scoped search when only application_id is configured | |
| if not search_tasks and self.application_id: | |
| app_kwargs: dict[str, Any] = {"query": input_text} | |
| if isinstance(self.mem0_client, AsyncMemory): | |
| app_kwargs["app_id"] = self.application_id | |
| else: | |
| app_kwargs["filters"] = {"app_id": self.application_id} | |
| search_tasks.append(self.mem0_client.search(**app_kwargs)) | |
| if not search_tasks: | |
| return |
| seen_memory_ids = set() | ||
|
|
||
| for search_response in results: | ||
| if isinstance(search_response, Exception): |
There was a problem hiding this comment.
Should we be swallowing every search error here? gather(return_exceptions=True) + continue turns auth failures, bad config, rate limits, network/5xx all into an empty result with no log (module has no logger). A fully misconfigured provider becomes indistinguishable from one that legitimately found nothing, and the caller sees success. Could we at least log each exception arm, and maybe distinguish all-tasks-failed from genuinely-empty rather than returning silently?
| current_memories = [search_response] | ||
|
|
||
| for mem in current_memories: | ||
| mem_id = mem.get("id") |
There was a problem hiding this comment.
Is gating inclusion on mem_id being truthy intended? Any memory with a missing or falsy id (None, "", 0) gets dropped entirely, not just deduped. Old code included every memory regardless of id. Dedup should skip repeats, not discard id-less entries. Could we keep id-less memories in?
| mem_id = mem.get("id") | |
| mem_id = mem.get("id") | |
| if mem_id is not None and mem_id in seen_memory_ids: | |
| continue | |
| if mem_id is not None: | |
| seen_memory_ids.add(mem_id) | |
| memories.append(mem) |
Motivation and Context
This change is required because the current
Mem0ContextProviderfails to retrieve any stored memories during thebefore_runphase. It solves two critical bugs in how the provider interacts with the Mem0 API:metadatadictionary but searching for it using Mem0's native top-levelapp_idparameter, resulting in instant filtering failures.user_1ORagent_1). By passing bothuser_idandagent_idin a single bundled filters dictionary, the provider forced a strict logical AND intersection (user == X AND agent == Y). Since no single memory row contains both tags, the database always returned zero results.Fixes #6237
Description
Changes Implemented:
after_run(Ingestion Fix): Modified themem0_client.addpayload to passself.application_idto the nativeapp_idparameter instead of trapping it inside themetadatadictionary. This aligns the insertion schema with the retrieval schema.before_run(Retrieval Fix): Completely removed the bundled_build_filterslogic. Replaced it with a concurrent architecture usingasyncio.gatherto query the User partition and the Agent partition independently.id.build_search_kwargshelper function insidebefore_run. This safely generates the query dictionaries without shallow-copy side effects, and cleanly handles the differing payload requirements betweenAsyncMemory(OSS) andAsyncMemoryClient(Platform).Contribution Checklist