-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
167 lines (144 loc) · 5.73 KB
/
server.py
File metadata and controls
167 lines (144 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import time
from threading import Lock
import logging
from typing import Annotated
from pydantic import Field
from decouple import config, Choices
import requests
from codeowners import CodeOwners
from fastmcp import FastMCP
# Read environment variables
GITHUB_TOKEN = config('GITHUB_TOKEN', default=None) or config('GITHUB_PERSONAL_ACCESS_TOKEN', default=None)
DEBUG = config('DEBUG', default=False, cast=bool)
CACHE_TTL_SECS = config('CACHE_TTL_SECS', default=300, cast=int)
TRANSPORT = config('TRANSPORT', default='stdio', cast=Choices(["stdio", "sse", "streamable-http"]))
HOST = config('HOST', default='127.0.0.1')
PORT = config('PORT', default=8000, cast=int)
# Setup logging
logging.basicConfig(level=logging.DEBUG if DEBUG else logging.INFO)
logger = logging.getLogger(__name__)
class CodeownersCache:
def __init__(self):
self.cache = {}
self.etags = {}
self.timestamps = {}
self.lock = Lock()
def _get_headers(self, etag=None):
headers = {
"Accept": "application/vnd.github.v3.raw",
}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
if etag:
headers["If-None-Match"] = etag
return headers
def get_codeowners(self, owner, repo, branch="main"):
key = f"{owner}/{repo}@{branch}"
with self.lock:
now = time.time()
if (
key in self.cache and
(now - self.timestamps.get(key, 0) < CACHE_TTL_SECS)
):
logger.debug(f"Cache hit for {key}")
return self.cache[key]
url = f"https://api.github.com/repos/{owner}/{repo}/contents/.github/CODEOWNERS?ref={branch}"
headers = self._get_headers(etag=self.etags.get(key))
logger.debug(f"Fetching CODEOWNERS from {url}")
response = requests.get(url, headers=headers)
if response.status_code == 304:
logger.debug("CODEOWNERS not modified (304)")
self.timestamps[key] = now
return self.cache[key]
elif response.status_code == 200:
content = response.text
logger.debug("Fetched and cached new CODEOWNERS content")
self.cache[key] = CodeOwners(content)
self.etags[key] = response.headers.get("ETag")
self.timestamps[key] = now
return self.cache[key]
else:
error_msg = f"Failed to fetch CODEOWNERS: {response.status_code} {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
# Global CODEOWNERS cache
codeowners_cache = CodeownersCache()
mcp = FastMCP(
name="github-codeowners",
instructions="""
This MCP server expose ownership information for files contained in Github repositories.
"""
)
def get_file_exists(
owner: Annotated[str, Field(description="Repository owner")],
repo: Annotated[str, Field(description="Repository name")],
path: Annotated[str, Field(description="File path")],
branch: Annotated[str, Field(description="Branch name")] = "main"
) -> bool:
"""
Returns if the given file exists
"""
# No owners, check if the file exists in GitHub
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}"
headers = {"Accept": "application/vnd.github.v3+json"}
if GITHUB_TOKEN:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
logger.debug(f"Checking if file exists: {url}")
response = requests.get(url, headers=headers)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
error_msg = f"Unexpected error when checking file existence: {response.status_code} {response.text}"
raise Exception(error_msg)
def get_file_owners(
owner: Annotated[str, Field(description="Repository owner")],
repo: Annotated[str, Field(description="Repository name")],
path: Annotated[str, Field(description="File path")],
branch: Annotated[str, Field(description="Branch name")] = "main"
) -> list[str]:
"""
Returns the owners of the specified file in the GitHub repository.
The owners are derived from the CODEOWNERS file in the repository.
"""
try:
codeowners = codeowners_cache.get_codeowners(owner, repo, branch)
owners = codeowners.of(path)
logger.debug(f"Owners for {path}: {owners}")
if owners:
# owners is a list of tuple Tuple[Literal["USERNAME", "TEAM", "EMAIL"], str]
# return only the actual owner of the file
return [o for _, o in owners]
if not get_file_exists(owner, repo, path, branch):
raise FileNotFoundError(f"File '{path}' not found in repo '{owner}/{repo}' on branch '{branch}'.")
return []
except Exception:
logger.exception("Failed to get file owner")
raise
@mcp.tool()
def get_files_owners(
owner: Annotated[str, "Repository owner"],
repo: Annotated[str, "Repository name"],
paths: Annotated[list[str], "List of file paths"],
branch: Annotated[str, "Branch name"] = "main",
) -> dict:
"""
Returns the owners of the given files in the GitHub repository.
The owners are derived from the CODEOWNERS file in the repository.
"""
res = dict()
for path in paths:
try:
owners = get_file_owners(owner, repo, path, branch)
res[path] = dict(owners=owners)
except FileNotFoundError as e:
res[path] = dict(error=str(e))
return res
def main():
if(TRANSPORT == 'stdio'):
mcp.run(transport=TRANSPORT)
else:
mcp.run(transport=TRANSPORT, host=HOST, port=PORT)
if __name__ == "__main__":
main()