|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# Copyright (C) 2024 Intel Corporation |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | + |
| 5 | +import asyncio |
| 6 | +import json |
| 7 | +import sys |
| 8 | + |
| 9 | +from mcp.client.session import ClientSession |
| 10 | +from mcp.client.sse import sse_client |
| 11 | + |
| 12 | + |
| 13 | +async def validate_chathistory_mcp(ip_address, service_port): |
| 14 | + """Validate ChatHistory service MCP functionality.""" |
| 15 | + endpoint = f"http://{ip_address}:{service_port}" |
| 16 | + |
| 17 | + # Ensure trailing slash for SSE endpoint to avoid redirect |
| 18 | + sse_endpoint = endpoint + "/sse/" |
| 19 | + print(f"Connecting to SSE endpoint: {sse_endpoint}") |
| 20 | + |
| 21 | + async with sse_client(sse_endpoint) as streams: |
| 22 | + async with ClientSession(*streams) as session: |
| 23 | + result = await session.initialize() |
| 24 | + |
| 25 | + # Test create operation |
| 26 | + chat_data = { |
| 27 | + "data": { |
| 28 | + "messages": [{"role": "user", "content": "Hello, this is a test message"}], |
| 29 | + "user": "test_user", |
| 30 | + }, |
| 31 | + "first_query": "Hello, this is a test message", |
| 32 | + } |
| 33 | + |
| 34 | + # Create chat conversation |
| 35 | + tool_result = await session.call_tool( |
| 36 | + "create_documents", |
| 37 | + {"document": chat_data}, |
| 38 | + ) |
| 39 | + create_result = tool_result.content |
| 40 | + |
| 41 | + # The response is just the ID string, not JSON |
| 42 | + conversation_id = create_result[0].text |
| 43 | + |
| 44 | + if not conversation_id: |
| 45 | + print(f"Create operation failed. Received was {create_result}") |
| 46 | + exit(1) |
| 47 | + |
| 48 | + # Test get operation |
| 49 | + get_data = {"user": "test_user", "id": conversation_id} |
| 50 | + |
| 51 | + tool_result = await session.call_tool( |
| 52 | + "get_documents", |
| 53 | + {"document": get_data}, |
| 54 | + ) |
| 55 | + get_result = tool_result.content |
| 56 | + # Try to parse the response as JSON |
| 57 | + try: |
| 58 | + retrieved_doc = json.loads(get_result[0].text) |
| 59 | + except json.JSONDecodeError: |
| 60 | + # If not JSON, assume it's an error or empty |
| 61 | + retrieved_doc = None |
| 62 | + |
| 63 | + if not retrieved_doc or retrieved_doc.get("user") != "test_user": |
| 64 | + print(f"Get operation failed. Received was {get_result}") |
| 65 | + exit(1) |
| 66 | + |
| 67 | + # Test delete operation |
| 68 | + delete_data = {"user": "test_user", "id": conversation_id} |
| 69 | + |
| 70 | + tool_result = await session.call_tool( |
| 71 | + "delete_documents", |
| 72 | + {"document": delete_data}, |
| 73 | + ) |
| 74 | + delete_result = tool_result.content |
| 75 | + |
| 76 | + if not delete_result: |
| 77 | + print(f"Delete operation failed. Received was {delete_result}") |
| 78 | + exit(1) |
| 79 | + |
| 80 | + print("Result correct.") |
| 81 | + |
| 82 | + |
| 83 | +if __name__ == "__main__": |
| 84 | + if len(sys.argv) != 3: |
| 85 | + print("Usage: python3 validate_mcp.py <ip_address> <service_port>") |
| 86 | + exit(1) |
| 87 | + ip_address = sys.argv[1] |
| 88 | + service_port = sys.argv[2] |
| 89 | + asyncio.run(validate_chathistory_mcp(ip_address, service_port)) |
0 commit comments