Skip to content

Commit 0beaec0

Browse files
committed
connect to db on-demand
1 parent 281145a commit 0beaec0

File tree

1 file changed

+91
-86
lines changed

1 file changed

+91
-86
lines changed

server.py

Lines changed: 91 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import sqlite3
22
import os
33
import enum
4-
from contextlib import AsyncIterator
54
from contextlib import asynccontextmanager
65
from dataclasses import dataclass
76
from datetime import datetime, timedelta
87
import logging
9-
from typing import Dict, List, Optional
8+
from typing import AsyncIterator, Dict, List, Optional
109
from mcp.server.fastmcp import FastMCP, Context
10+
import asyncio
1111
from urllib.parse import urlparse
1212
from collections import Counter, defaultdict
1313
import re
14+
from functools import lru_cache
1415

1516
logging.basicConfig(level=logging.INFO)
1617
logger = logging.getLogger("browser-storage-mcp")
@@ -23,11 +24,6 @@
2324
# PATH_TO_CHROME_HISTORY = os.path.join(CHROME_HISTORY_DIR, "History")
2425

2526
@dataclass(frozen=True)
26-
class BrowserType(enum.Enum):
27-
FIREFOX = "firefox"
28-
#CHROME = "chrome"
29-
30-
@dataclass
3127
class HistoryEntry:
3228
"""Represents a single browser history entry"""
3329
url: str
@@ -45,40 +41,10 @@ def to_dict(self) -> Dict:
4541

4642
@dataclass
4743
class AppContext:
48-
firefox_db: sqlite3.Connection
49-
chrome_db: sqlite3.Connection
50-
51-
@asynccontextmanager
52-
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
53-
"""Manage application lifecycle with type-safe context"""
54-
firefox_db = None
55-
chrome_db = None
56-
57-
try:
58-
# Initialize Firefox connection
59-
if os.path.exists(PATH_TO_FIREFOX_HISTORY):
60-
firefox_db = sqlite3.connect(PATH_TO_FIREFOX_HISTORY)
61-
logger.info("Connected to Firefox history database")
62-
else:
63-
logger.warning(f"Firefox history not found at {PATH_TO_FIREFOX_HISTORY}")
64-
65-
# Initialize Chrome connection
66-
# if os.path.exists(PATH_TO_CHROME_HISTORY):
67-
# chrome_db = sqlite3.connect(PATH_TO_CHROME_HISTORY)
68-
# logger.info("Connected to Chrome history database")
69-
# else:
70-
# logger.warning(f"Chrome history not found at {PATH_TO_CHROME_HISTORY}")
71-
72-
yield AppContext(firefox_db=firefox_db, chrome_db=chrome_db)
73-
74-
finally:
75-
if firefox_db:
76-
firefox_db.close()
77-
if chrome_db:
78-
chrome_db.close()
44+
firefox_db: Optional[sqlite3.Connection]
45+
chrome_db: Optional[sqlite3.Connection]
7946

80-
# Create server with lifespan
81-
mcp = FastMCP(name="Browser History MCP", instructions="This server makes it possible to query a user's Firefox browser history, analyze it, and create a thoughtful report with an optional lense of productivity or learning.", lifespan=app_lifespan)
47+
mcp = FastMCP(name="Browser History MCP", instructions="This server makes it possible to query a user's Firefox browser history, analyze it, and create a thoughtful report with an optional lense of productivity or learning.")
8248

8349
@mcp.prompt()
8450
def productivity_analysis() -> str:
@@ -190,38 +156,51 @@ def research_topic_extraction() -> str:
190156
Format as a research notebook with topics, key findings, and open questions.
191157
"""
192158

193-
def _get_firefox_history(db: sqlite3.Connection, days: int) -> List[HistoryEntry]:
159+
def _get_firefox_history(days: int) -> List[HistoryEntry]:
194160
"""Get Firefox history from the last N days"""
195-
cursor = db.cursor()
196-
197-
# Firefox stores timestamps as microseconds since Unix epoch
198-
cutoff_time = (datetime.now() - timedelta(days=days)).timestamp() * 1_000_000
199-
200-
query = """
201-
SELECT DISTINCT h.url, h.title, h.visit_count, h.last_visit_date
202-
FROM moz_places h
203-
WHERE h.last_visit_date > ?
204-
AND h.hidden = 0
205-
AND h.url NOT LIKE 'moz-extension://%'
206-
ORDER BY h.last_visit_date DESC
207-
"""
208-
209-
cursor.execute(query, (cutoff_time,))
210-
results = cursor.fetchall()
161+
# Check if database exists
162+
if not os.path.exists(PATH_TO_FIREFOX_HISTORY):
163+
raise RuntimeError(f"Firefox history not found at {PATH_TO_FIREFOX_HISTORY}")
211164

212-
entries = []
213-
for url, title, visit_count, last_visit_date in results:
214-
# Convert Firefox timestamp (microseconds) to datetime
215-
visit_time = datetime.fromtimestamp(last_visit_date / 1_000_000)
165+
# Connect to the database
166+
conn = sqlite3.connect(f"file:{PATH_TO_FIREFOX_HISTORY}?mode=ro", uri=True)
167+
try:
168+
cursor = conn.cursor()
169+
170+
# Firefox stores timestamps as microseconds since Unix epoch
171+
cutoff_time = (datetime.now() - timedelta(days=days)).timestamp() * 1_000_000
172+
173+
query = """
174+
SELECT DISTINCT h.url, h.title, h.visit_count, h.last_visit_date
175+
FROM moz_places h
176+
WHERE h.last_visit_date > ?
177+
AND h.hidden = 0
178+
AND h.url NOT LIKE 'moz-extension://%'
179+
ORDER BY h.last_visit_date DESC
180+
"""
181+
182+
cursor.execute(query, (cutoff_time,))
183+
results = cursor.fetchall()
216184

217-
entries.append(HistoryEntry(
218-
url=url or "",
219-
title=title,
220-
visit_count=visit_count or 0,
221-
last_visit_time=visit_time
222-
))
223-
224-
return entries
185+
entries = []
186+
for url, title, visit_count, last_visit_date in results:
187+
# Convert Firefox timestamp (microseconds) to datetime
188+
visit_time = datetime.fromtimestamp(last_visit_date / 1_000_000)
189+
190+
entries.append(HistoryEntry(
191+
url=url or "",
192+
title=title,
193+
visit_count=visit_count or 0,
194+
last_visit_time=visit_time
195+
))
196+
197+
return entries
198+
except Exception as e:
199+
logger.error(f"Error querying Firefox history: {e}")
200+
raise RuntimeError(f"Failed to query Firefox history: {e}")
201+
finally:
202+
if conn:
203+
conn.close()
225204

226205
# def _get_chrome_history(db: sqlite3.Connection, days: int) -> List[HistoryEntry]:
227206
# """Get Chrome history from the last N days"""
@@ -261,18 +240,20 @@ def _get_firefox_history(db: sqlite3.Connection, days: int) -> List[HistoryEntry
261240
# return entries
262241

263242
@mcp.tool()
264-
async def get_browser_history(context: AppContext, time_period_in_days: int, browser_type: BrowserType) -> List[Dict]:
265-
"""Get browser history from Firefox for the specified time period in days"""
243+
async def get_browser_history(time_period_in_days: int, browser_type: str = "firefox") -> List[Dict]:
244+
"""Get browser history from Firefox for the specified time period in days.
245+
246+
Args:
247+
time_period_in_days: Number of days of history to retrieve
248+
browser_type: Browser type (currently only 'firefox' is supported)
249+
"""
266250

267251
if time_period_in_days <= 0:
268252
raise ValueError("time_period_in_days must be a positive integer")
269253

270-
if browser_type == BrowserType.FIREFOX:
271-
if not context.firefox_db:
272-
raise RuntimeError("Firefox database not available")
273-
254+
if browser_type == "firefox":
274255
try:
275-
entries = _get_firefox_history(context.firefox_db, time_period_in_days)
256+
entries = _get_firefox_history(time_period_in_days)
276257
logger.info(f"Retrieved {len(entries)} Firefox history entries from last {time_period_in_days} days")
277258
return [entry.to_dict() for entry in entries]
278259
except sqlite3.Error as e:
@@ -295,8 +276,13 @@ async def get_browser_history(context: AppContext, time_period_in_days: int, bro
295276
# raise ValueError(f"Unsupported browser type: {browser_type}")
296277

297278
@mcp.tool()
298-
async def group_browsing_history_into_sessions(context: AppContext, history_data: List[Dict], max_gap_hours: float = 2.0) -> List[Dict]:
299-
"""Group browser history into sessions based on time gaps"""
279+
async def group_browsing_history_into_sessions(history_data: List[Dict], max_gap_hours: float = 2.0) -> List[Dict]:
280+
"""Group browser history into sessions based on time gaps.
281+
282+
Args:
283+
history_data: List of history entries from get_browser_history
284+
max_gap_hours: Maximum hours between visits to consider same session
285+
"""
300286

301287
if not history_data:
302288
return []
@@ -350,8 +336,13 @@ async def group_browsing_history_into_sessions(context: AppContext, history_data
350336

351337

352338
@mcp.tool()
353-
async def categorize_browsing_history(context: AppContext, history_data: List[Dict]) -> Dict[str, List[Dict]]:
354-
"""Categorize URLs into meaningful groups"""
339+
@lru_cache(maxsize=1000)
340+
async def categorize_browsing_history(history_data: List[Dict]) -> Dict[str, List[Dict]]:
341+
"""Categorize URLs into meaningful groups.
342+
343+
Args:
344+
history_data: List of history entries from get_browser_history
345+
"""
355346

356347
categories = {
357348
'social_media': ['facebook.com', 'twitter.com', 'instagram.com', 'reddit.com', 'linkedin.com'],
@@ -395,8 +386,14 @@ async def categorize_browsing_history(context: AppContext, history_data: List[Di
395386
return result
396387

397388
@mcp.tool()
398-
async def analyze_domain_frequency(context: AppContext, history_data: List[Dict], top_n: int = 20) -> List[Dict]:
399-
"""Analyze most frequently visited domains"""
389+
@lru_cache(maxsize=1000)
390+
async def analyze_domain_frequency(history_data: List[Dict], top_n: int = 20) -> List[Dict]:
391+
"""Analyze most frequently visited domains.
392+
393+
Args:
394+
history_data: List of history entries from get_browser_history
395+
top_n: Number of top domains to return
396+
"""
400397

401398
domain_stats = defaultdict(lambda: {'count': 0, 'total_visits': 0, 'titles': set()})
402399

@@ -424,8 +421,12 @@ async def analyze_domain_frequency(context: AppContext, history_data: List[Dict]
424421
return domain_list[:top_n]
425422

426423
@mcp.tool()
427-
async def find_learning_paths(context: AppContext, history_data: List[Dict]) -> List[Dict]:
428-
"""Identify learning progressions in browsing history"""
424+
async def find_learning_paths(history_data: List[Dict]) -> List[Dict]:
425+
"""Identify learning progressions in browsing history.
426+
427+
Args:
428+
history_data: List of history entries from get_browser_history
429+
"""
429430

430431
# Common learning indicators in URLs
431432
learning_patterns = {
@@ -490,8 +491,12 @@ async def find_learning_paths(context: AppContext, history_data: List[Dict]) ->
490491
return learning_sessions
491492

492493
@mcp.tool()
493-
async def calculate_productivity_metrics(context: AppContext, categorized_data: Dict[str, Dict]) -> Dict:
494-
"""Calculate productivity metrics from categorized browsing data"""
494+
async def calculate_productivity_metrics(categorized_data: Dict[str, Dict]) -> Dict:
495+
"""Calculate productivity metrics from categorized browsing data.
496+
497+
Args:
498+
categorized_data: Categorized browsing data from categorize_browsing_history
499+
"""
495500

496501
productive_categories = {'development', 'learning', 'productivity'}
497502
unproductive_categories = {'social_media', 'entertainment', 'shopping'}

0 commit comments

Comments
 (0)