Skip to content

Commit 82ba539

Browse files
Refine Data Access using OpeaStore (#1916)
* Porting the data store layers of components to OpeaStore. * Decoupling user-defined data structure from data access layer of MongoDB. * Adding OpeaStore tools * Add search by KV API * Update READMEs --------- Signed-off-by: Yi Yao <yi.a.yao@intel.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 682086a commit 82ba539

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1055
-1010
lines changed

comps/chathistory/README.md

Lines changed: 6 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -6,78 +6,18 @@ It can be integrated into application by making HTTP requests to the provided AP
66

77
![Flow Chart](./assets/img/chathistory_flow.png)
88

9-
---
10-
119
## 🛠️ Features
1210

1311
- **Store Chat Conversations**: Save chat messages user information, and metadata associated with each conversation.
1412
- **Retrieve Chat Histories**: Fetch chat histories for a specific user or retrieve a particular conversation by its unique identifier.
1513
- **Update Chat Conversations**: Modify existing chat conversations by adding new messages or updating existing ones.
1614
- **Delete Chat Conversations**: Remove chat conversations record from database.
1715

18-
---
19-
20-
## 🤖 MCP (Model Context Protocol) Support
21-
22-
The Chat History microservice supports MCP integration, allowing AI agents to discover and utilize chat history management capabilities as tools.
23-
24-
### MCP Configuration
25-
26-
#### Environment Variables
27-
28-
- `ENABLE_MCP`: Set to `true`, `1`, or `yes` to enable MCP support (default: `false`)
29-
30-
#### Docker Compose
31-
32-
```yaml
33-
services:
34-
chathistory-mongo:
35-
environment:
36-
ENABLE_MCP: true
37-
```
38-
39-
#### Kubernetes
40-
41-
```yaml
42-
chathistory:
43-
ENABLE_MCP: true
44-
```
45-
46-
### MCP Tools Available
47-
48-
When MCP is enabled, the following tools are available for AI agents:
49-
50-
1. **create_documents** - Create or update chat conversation history
51-
2. **get_documents** - Retrieve chat conversation history
52-
3. **delete_documents** - Delete chat conversation history
53-
54-
### Usage with AI Agents
55-
56-
```python
57-
from comps.cores.mcp import OpeaMCPToolsManager
58-
59-
# Initialize MCP tools manager
60-
tools_manager = OpeaMCPToolsManager()
61-
62-
# Add chathistory service
63-
tools_manager.add_service("http://chathistory-service:6012")
64-
65-
# AI agents can now discover and use chathistory tools
66-
tools = await tools_manager.get_available_tools()
67-
```
68-
69-
### MCP Endpoint
70-
71-
When MCP is enabled, the service exposes an additional SSE endpoint:
72-
73-
- `/sse` - Server-Sent Events endpoint for MCP communication
74-
75-
---
76-
77-
## ⚙️ Implementation
78-
79-
The Chat History microservice able to support various database backends for storing the chat conversations.
16+
## ⚙️ Deployment Options
8017

81-
### Chat History with MongoDB
18+
To get detailed, step-by-step instructions on deploying the `chathistory` microservice, you should consult the deployment guide. This guide will walk you through all the necessary steps, from building the Docker images to configuring your environment and running the service.
8219

83-
For more detail, please refer to this [README](src/README.md)
20+
| Platform | Deployment Method | Database | Link |
21+
| -------- | ----------------- | -------- | --------------------------------------------------------- |
22+
| CPU | Docker | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) |
23+
| CPU | Docker Compose | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) |

comps/chathistory/src/README.md renamed to comps/chathistory/deployment/docker_compose/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This README provides setup guides and all the necessary information about the Ch
99
```bash
1010
export http_proxy=${your_http_proxy}
1111
export https_proxy=${your_http_proxy}
12+
export OPEA_STORE_NAME="mongodb"
1213
export MONGO_HOST=${MONGO_HOST}
1314
export MONGO_PORT=27017
1415
export DB_NAME=${DB_NAME}

comps/chathistory/deployment/docker_compose/compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ services:
2424
http_proxy: ${http_proxy}
2525
no_proxy: ${no_proxy}
2626
https_proxy: ${https_proxy}
27+
OPEA_STORE_NAME: ${OPEA_STORE_NAME:-mongodb}
2728
MONGO_HOST: ${MONGO_HOST}
2829
MONGO_PORT: ${MONGO_PORT}
2930
COLLECTION_NAME: ${COLLECTION_NAME}

comps/chathistory/src/document_store.py

Lines changed: 0 additions & 159 deletions
This file was deleted.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from typing import Optional
5+
6+
from fastapi import HTTPException
7+
from pydantic import BaseModel
8+
9+
from comps.cores.proto.api_protocol import ChatCompletionRequest
10+
from comps.cores.storages.models import ChatId, ChatMessage
11+
from comps.cores.storages.stores import column_to_id, get_store, id_to_column
12+
13+
14+
class ChatMessageDto(BaseModel):
15+
data: ChatCompletionRequest
16+
first_query: Optional[str] = None
17+
doc_id: Optional[str] = None
18+
user: Optional[str] = None
19+
20+
21+
def _prepersist(document: ChatMessage) -> dict:
22+
"""Converts a ChatMessage object to a dictionary suitable for persistence.
23+
24+
Args:
25+
document (ChatMessage): The ChatMessage object to be converted.
26+
27+
Returns:
28+
dict: A dictionary representation of the ChatMessage, ready for persistence.
29+
"""
30+
data_dict = document.model_dump(by_alias=True, mode="json")
31+
data_dict = column_to_id("id", data_dict)
32+
return data_dict
33+
34+
35+
def _post_getby_id(rs: dict) -> dict:
36+
"""Post-processes a document retrieved by ID from the store.
37+
38+
Args:
39+
rs (dict): The raw document dictionary from the store.
40+
41+
Returns:
42+
dict: The processed document data, or None if the document doesn't exist.
43+
"""
44+
rs = id_to_column("id", rs)
45+
return rs.get("data") if rs else None
46+
47+
48+
def _post_getby_user(rss: list) -> list:
49+
"""Post-processes a list of documents retrieved by user from the store.
50+
51+
Args:
52+
rss (list): A list of raw document dictionaries from the store.
53+
54+
Returns:
55+
list: A list of processed documents with the 'data' field removed.
56+
"""
57+
for rs in rss:
58+
rs = id_to_column("id", rs)
59+
rs.pop("data")
60+
return rss
61+
62+
63+
def _check_user_info(document: ChatMessage | ChatId):
64+
"""Checks if the user information is provided in the document.
65+
66+
Args:
67+
document (ChatMessage|ChatId): The document to be checked.
68+
Raises:
69+
HTTPException: If the user information is missing.
70+
"""
71+
user = document.data.user if isinstance(document, ChatMessage) else document.user
72+
if user is None:
73+
raise HTTPException(status_code=400, detail="Please provide the user information")
74+
75+
76+
async def save_or_update(document: ChatMessage):
77+
"""Saves a new chat message or updates an existing one in the data store.
78+
79+
Args:
80+
document (ChatMessage): The ChatMessage object to be saved or updated.
81+
If the document has an ID, it will be updated;
82+
otherwise, a new document will be created.
83+
84+
Returns:
85+
The result of the save or update operation from the store.
86+
"""
87+
_check_user_info(document)
88+
store = get_store(document.data.user)
89+
if document.id:
90+
return await store.aupdate_document(_prepersist(document))
91+
else:
92+
return await store.asave_document(_prepersist(document))
93+
94+
95+
async def get(document: ChatId):
96+
"""Retrieves chat messages from the data store.
97+
98+
Args:
99+
document (ChatId): The ChatId object containing user information and
100+
optionally a document ID. If document.id is None,
101+
retrieves all documents for the user; otherwise,
102+
retrieves the specific document by ID.
103+
104+
Returns:
105+
Either a list of all documents for the user (if document.id is None) or
106+
a specific document (if document.id is provided).
107+
"""
108+
_check_user_info(document)
109+
store = get_store(document.user)
110+
if document.id is None:
111+
rss = await store.asearch(key="data.user", value=document.user)
112+
return _post_getby_user(rss)
113+
else:
114+
rs = await store.aget_document_by_id(document.id)
115+
return _post_getby_id(rs)
116+
117+
118+
async def delete(document: ChatId):
119+
"""Deletes a specific chat message from the data store.
120+
121+
Args:
122+
document (ChatId): The ChatId object containing user information and document ID.
123+
The document ID must be provided for deletion.
124+
125+
Returns:
126+
The result of the delete operation from the store.
127+
128+
Raises:
129+
Exception: If the document ID is not provided.
130+
"""
131+
_check_user_info(document)
132+
store = get_store(document.user)
133+
if document.id is None:
134+
raise Exception("Document id is required.")
135+
else:
136+
return await store.adelete_document(document.id)

0 commit comments

Comments
 (0)