-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcommon.py
More file actions
305 lines (243 loc) · 9.78 KB
/
common.py
File metadata and controls
305 lines (243 loc) · 9.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""Common utilities shared between claude.py and codex.py"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Iterable, Mapping, Optional
import browser_cookie3
DEFAULT_BROWSERS = ("chrome", "chromium", "brave", "edge", "firefox", "helium")
LOGIN_URLS = {
"claude.ai": "https://claude.ai",
"chatgpt.com": "https://chatgpt.com",
"opencode.ai": "https://opencode.ai/zen",
}
# Cache configuration
CACHE_DIR = Path.home() / ".cache" / "waybar-ai-usage"
LOGIN_OPEN_COOLDOWN = 600 # Don't re-open the same login page within 10 minutes
def open_login_url(url: str) -> bool:
"""Try to open URL in default browser with cooldown to avoid repeated opens."""
import hashlib
import subprocess
CACHE_DIR.mkdir(parents=True, exist_ok=True)
marker = CACHE_DIR / f"login_{hashlib.md5(url.encode()).hexdigest()}"
if marker.exists() and (time.time() - marker.stat().st_mtime) < LOGIN_OPEN_COOLDOWN:
return False
try:
subprocess.Popen(
["xdg-open", url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
marker.touch()
return True
except FileNotFoundError:
return False
CACHE_TTL = 60 # Cache valid for 60 seconds
def get_cached_or_fetch(
cache_name: str,
fetch_func: Callable[[], dict],
ttl: int = CACHE_TTL
) -> dict:
"""
Get data from cache if fresh, otherwise fetch and cache.
This prevents multiple Waybar instances (one per monitor) from making
concurrent API requests that might be rate-limited.
Args:
cache_name: Name of cache file (e.g., "claude", "codex")
fetch_func: Function to call to fetch fresh data
ttl: Cache time-to-live in seconds
Returns:
Cached or freshly fetched data
"""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / f"{cache_name}.json"
updating_file = CACHE_DIR / f"{cache_name}.updating"
# Check if cache is fresh
if cache_file.exists():
cache_age = time.time() - cache_file.stat().st_mtime
if cache_age < ttl:
# Cache is fresh, use it
try:
with open(cache_file, 'r') as f:
return json.load(f)
except Exception:
# Cache file corrupted, proceed to fetch
pass
# Check if another process is already updating
if updating_file.exists():
update_age = time.time() - updating_file.stat().st_mtime
# If update marker is older than 5 seconds, assume stale and proceed
if update_age < 5:
# Wait briefly for the other process to finish
for _ in range(6): # Wait up to 3 seconds (6 * 0.5s)
time.sleep(0.5)
if cache_file.exists():
cache_age = time.time() - cache_file.stat().st_mtime
if cache_age < ttl + 10: # Accept slightly older cache when waiting
try:
with open(cache_file, 'r') as f:
return json.load(f)
except Exception:
pass
# Need to fetch fresh data
# Create updating marker
try:
updating_file.touch()
except Exception:
pass
try:
# Fetch fresh data
data = fetch_func()
# Save to cache
try:
with open(cache_file, 'w') as f:
json.dump(data, f)
except Exception:
# Failed to save cache, but we have the data
pass
return data
finally:
# Always remove updating marker
try:
updating_file.unlink(missing_ok=True)
except Exception:
pass
def helium(cookie_file=None, domain_name="", key_file=None):
"""Returns a cookiejar of the cookies used by Helium browser.
Helium is a Chromium-based browser, so we use the chromium loader
with Helium's cookie file path.
"""
import os
if cookie_file is None:
cookie_file = os.path.expanduser("~/.config/net.imput.helium/Default/Cookies")
return browser_cookie3.chromium(cookie_file=cookie_file, domain_name=domain_name, key_file=key_file)
def load_cookies(domain: str, browsers: Iterable[str] | None = None) -> tuple[dict, str]:
"""Load cookies for a domain from the first available browser in order."""
browsers = list(browsers or DEFAULT_BROWSERS)
errors: list[str] = []
for name in browsers:
# First check if we have a local implementation (e.g., helium)
loader = globals().get(name)
if loader is None:
# Fall back to browser_cookie3
loader = getattr(browser_cookie3, name, None)
if loader is None:
errors.append(f"{name}: unsupported by browser_cookie3")
continue
try:
cj = loader(domain_name=domain)
cookies = {c.name: c.value for c in cj}
except Exception as exc:
errors.append(f"{name}: {exc}")
continue
if cookies:
return cookies, name
errors.append(f"{name}: no cookies found")
detail = "; ".join(errors) if errors else "no browsers provided"
raise RuntimeError(f"Failed to read cookies for {domain}: {detail}")
@dataclass
class WindowUsage:
"""Usage information for a time window."""
utilization: float
resets_at: Optional[str | int]
def parse_window_percent(raw: Mapping[str, object] | None, key: str = "utilization") -> WindowUsage:
"""Parse window where Claude returns utilization as 0–100% (may be float)."""
raw = raw or {}
util = raw.get(key) or 0
resets = raw.get("resets_at")
try:
util_f = float(util)
except Exception:
util_f = 0.0
return WindowUsage(utilization=util_f, resets_at=resets) # type: ignore[arg-type]
def parse_window_direct(raw: Mapping[str, object] | None) -> WindowUsage:
"""Parse window where used_percent is already 0-100 - used by ChatGPT."""
raw = raw or {}
used = raw.get("used_percent") or 0
reset_at = raw.get("reset_at")
try:
used_f = float(used)
except Exception:
used_f = 0.0
return WindowUsage(utilization=used_f, resets_at=reset_at) # type: ignore[arg-type]
def format_eta(reset_at: str | int | None) -> str:
"""Format ETA from ISO string or Unix timestamp -> '4h19′' or '19′30″'."""
if not reset_at:
return "0′00″"
try:
# Handle both ISO string and Unix timestamp
if isinstance(reset_at, str):
if reset_at.endswith('Z'):
reset_at = reset_at[:-1] + '+00:00'
reset_dt = datetime.fromisoformat(reset_at)
else:
reset_dt = datetime.fromtimestamp(reset_at, tz=timezone.utc)
now = datetime.now(timezone.utc)
delta = reset_dt - now
except Exception:
return "??′??″"
secs = int(delta.total_seconds())
if secs <= 0:
return "0m00s"
# Show days+hours if > 24 hours
if secs >= 86400: # 24 * 3600
days = secs // 86400
hours = (secs % 86400) // 3600
return f"{days}d{hours:02}h"
# Show hours+minutes if > 1 hour
if secs >= 3600:
hours = secs // 3600
mins = (secs % 3600) // 60
return f"{hours}h{mins:02}m"
# Show minutes+seconds
mins = secs // 60
secs_rem = secs % 60
return f"{mins}m{secs_rem:02}s"
def format_output(format_string: str, data: dict) -> str:
"""
Format output using a template string with placeholders.
Available placeholders:
- {5h_pct} - 5-hour utilization percentage (no decimals)
- {7d_pct} - 7-day utilization percentage (no decimals)
- {5h_reset} - 5-hour reset time (formatted)
- {7d_reset} - 7-day reset time (formatted)
- {icon} - service icon
- {time_icon} - time icon
- {status} - status text (Ready, Pause, or empty)
- {pct} - active window percentage
- {reset} - active window reset time
Conditional sections:
- {?5h_reset}...{/5h_reset} - show content only if 5h_reset is not "Not started"
- {?7d_reset}...{/7d_reset} - show content only if 7d_reset is not "Not started"
- {?5h_reset&7d_reset}...{/} - show content only if both are not "Not started"
Example:
format_output("{icon} {5h_pct}% {time_icon} {5h_reset}", data)
format_output("{?5h_reset}{5h_pct}/{5h_reset}{/5h_reset}{?5h_reset&7d_reset} - {/}{?7d_reset}{7d_pct}/{7d_reset}{/7d_reset}", data)
"""
import re
# Process conditional blocks with multiple variables: {?var1&var2&...}content{/}
def replace_multi_conditional(match):
var_names = match.group(1).split('&')
content = match.group(2)
# Check if all variables exist and are not "Not started"
all_valid = all(data.get(v.strip(), "") and data.get(v.strip(), "") != "Not started" for v in var_names)
if all_valid:
return content.format(**data)
return ""
# Replace multi-variable conditional blocks first: {?var1&var2}content{/}
result = re.sub(r'\{\?([^}]+&[^}]+)\}(.*?)\{/\}', replace_multi_conditional, format_string)
# Process single variable conditional blocks: {?var}content{/var}
def replace_conditional(match):
var_name = match.group(1)
content = match.group(2)
value = data.get(var_name, "")
# Show content only if value exists and is not "Not started"
if value and value != "Not started":
return content.format(**data)
return ""
# Replace single-variable conditional blocks: {?var}content{/var}
result = re.sub(r'\{\?(\w+)\}(.*?)\{/\1\}', replace_conditional, result)
# Replace remaining placeholders
return result.format(**data)