Skip to content

Commit 782e2ff

Browse files
committed
feat: quota-aware account selection + fix premature model unavailability
- Move setModelUnavailable from per-account loop to all-accounts-exhausted path - Clear model unavailability on successful fallback - Add in-memory quota cache with background refresh (5min active, 20min exhausted) - Integrate quota cache in account selection to skip exhausted accounts - Mark accounts as exhausted from 429 when no cached quota data exists - Populate quota cache from dashboard usage endpoint
1 parent f3bbe41 commit 782e2ff

File tree

5 files changed

+310
-23
lines changed

5 files changed

+310
-23
lines changed

src/app/api/usage/[connectionId]/route.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { getUsageForProvider } from "@omniroute/open-sse/services/usage.ts";
44
import { getExecutor } from "@omniroute/open-sse/executors/index.ts";
55
import { syncToCloud } from "@/lib/cloudSync";
66
import { runWithProxyContext } from "@omniroute/open-sse/utils/proxyFetch.ts";
7+
import { setQuotaCache } from "@/domain/quotaCache";
78

89
/**
910
* Sync to cloud if enabled
@@ -147,6 +148,12 @@ export async function GET(request: Request, { params }: { params: Promise<{ conn
147148
const usage = await runWithProxyContext(proxyInfo?.proxy || null, () =>
148149
getUsageForProvider(connection)
149150
);
151+
152+
// Populate quota cache for quota-aware account selection
153+
if (usage?.quotas) {
154+
setQuotaCache(connectionId, connection.provider, usage.quotas);
155+
}
156+
150157
return Response.json(usage);
151158
} catch (error) {
152159
console.error("[Usage API] Error fetching usage:", error);

src/domain/quotaCache.ts

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/**
2+
* Quota Cache — Domain Layer
3+
*
4+
* In-memory cache of provider quota data per connectionId.
5+
* Populated by:
6+
* - Dashboard usage endpoint (GET /api/usage/[connectionId])
7+
* - 429 responses marking account as exhausted
8+
*
9+
* Background refresh runs every 1 minute:
10+
* - Active accounts (quota > 0%): refetch every 5 minutes
11+
* - Exhausted accounts: refetch every 20 minutes (or immediately after resetAt passes)
12+
*
13+
* @module domain/quotaCache
14+
*/
15+
16+
import { getUsageForProvider } from "@omniroute/open-sse/services/usage.ts";
17+
import { getProviderConnectionById, resolveProxyForConnection } from "@/lib/localDb";
18+
import { runWithProxyContext } from "@omniroute/open-sse/utils/proxyFetch.ts";
19+
20+
// ─── Types ──────────────────────────────────────────────────────────────────
21+
22+
interface QuotaInfo {
23+
remainingPercentage: number;
24+
resetAt: string | null;
25+
}
26+
27+
interface QuotaCacheEntry {
28+
connectionId: string;
29+
provider: string;
30+
quotas: Record<string, QuotaInfo>;
31+
fetchedAt: number;
32+
exhausted: boolean;
33+
nextResetAt: string | null;
34+
}
35+
36+
// ─── Constants ──────────────────────────────────────────────────────────────
37+
38+
const ACTIVE_TTL_MS = 5 * 60 * 1000; // 5 minutes for active accounts
39+
const EXHAUSTED_TTL_MS = 5 * 60 * 1000; // 5 minutes for 429-sourced entries (no resetAt)
40+
const EXHAUSTED_REFRESH_MS = 20 * 60 * 1000; // 20 minutes: recheck exhausted accounts
41+
const REFRESH_INTERVAL_MS = 60 * 1000; // Background tick every 1 minute
42+
43+
// ─── State ──────────────────────────────────────────────────────────────────
44+
45+
const cache = new Map<string, QuotaCacheEntry>();
46+
let refreshTimer: ReturnType<typeof setInterval> | null = null;
47+
48+
// ─── Helpers ────────────────────────────────────────────────────────────────
49+
50+
function isExhausted(quotas: Record<string, QuotaInfo>): boolean {
51+
const entries = Object.values(quotas);
52+
if (entries.length === 0) return false;
53+
return entries.every((q) => q.remainingPercentage <= 0);
54+
}
55+
56+
function earliestResetAt(quotas: Record<string, QuotaInfo>): string | null {
57+
let earliest: string | null = null;
58+
for (const q of Object.values(quotas)) {
59+
if (q.resetAt && (!earliest || new Date(q.resetAt) < new Date(earliest))) {
60+
earliest = q.resetAt;
61+
}
62+
}
63+
return earliest;
64+
}
65+
66+
function normalizeQuotas(rawQuotas: Record<string, any>): Record<string, QuotaInfo> {
67+
const result: Record<string, QuotaInfo> = {};
68+
for (const [key, q] of Object.entries(rawQuotas)) {
69+
if (q && typeof q === "object") {
70+
result[key] = {
71+
remainingPercentage:
72+
q.remainingPercentage ??
73+
(q.total ? Math.round(((q.total - (q.used || 0)) / q.total) * 100) : 100),
74+
resetAt: q.resetAt || null,
75+
};
76+
}
77+
}
78+
return result;
79+
}
80+
81+
// ─── Public API ─────────────────────────────────────────────────────────────
82+
83+
/**
84+
* Store quota data for a connection (called by usage endpoint and background refresh).
85+
*/
86+
export function setQuotaCache(
87+
connectionId: string,
88+
provider: string,
89+
rawQuotas: Record<string, any>
90+
) {
91+
const quotas = normalizeQuotas(rawQuotas);
92+
const exhausted = isExhausted(quotas);
93+
cache.set(connectionId, {
94+
connectionId,
95+
provider,
96+
quotas,
97+
fetchedAt: Date.now(),
98+
exhausted,
99+
nextResetAt: exhausted ? earliestResetAt(quotas) : null,
100+
});
101+
}
102+
103+
/**
104+
* Get cached quota entry (returns null if not cached).
105+
*/
106+
export function getQuotaCache(connectionId: string): QuotaCacheEntry | null {
107+
return cache.get(connectionId) || null;
108+
}
109+
110+
/**
111+
* Check if an account's quota is exhausted based on cached data.
112+
* Returns false if no cache entry exists (unknown = assume available).
113+
*/
114+
export function isAccountQuotaExhausted(connectionId: string): boolean {
115+
const entry = cache.get(connectionId);
116+
if (!entry) return false;
117+
118+
// If exhausted and we have a resetAt that has passed, consider it available
119+
if (entry.exhausted && entry.nextResetAt) {
120+
if (new Date(entry.nextResetAt).getTime() <= Date.now()) {
121+
return false; // Reset time passed, assume available until refresh confirms
122+
}
123+
}
124+
125+
// Check TTL based on state
126+
const age = Date.now() - entry.fetchedAt;
127+
if (entry.exhausted) {
128+
// Exhausted entries without resetAt use fixed TTL
129+
if (!entry.nextResetAt && age > EXHAUSTED_TTL_MS) return false;
130+
// Exhausted entries with resetAt stay valid until resetAt or refresh
131+
return true;
132+
}
133+
134+
// Active entries expire after ACTIVE_TTL
135+
if (age > ACTIVE_TTL_MS) return false;
136+
137+
return entry.exhausted;
138+
}
139+
140+
/**
141+
* Mark an account as quota-exhausted from a 429 response (no quota data available).
142+
* Uses 5-minute fixed TTL since we don't know the actual resetAt.
143+
*/
144+
export function markAccountExhaustedFrom429(connectionId: string, provider: string) {
145+
cache.set(connectionId, {
146+
connectionId,
147+
provider,
148+
quotas: {},
149+
fetchedAt: Date.now(),
150+
exhausted: true,
151+
nextResetAt: null,
152+
});
153+
}
154+
155+
// ─── Background Refresh ─────────────────────────────────────────────────────
156+
157+
async function refreshEntry(entry: QuotaCacheEntry) {
158+
try {
159+
const connection = await getProviderConnectionById(entry.connectionId);
160+
if (!connection || connection.authType !== "oauth" || !connection.isActive) {
161+
cache.delete(entry.connectionId);
162+
return;
163+
}
164+
165+
const proxyInfo = await resolveProxyForConnection(entry.connectionId);
166+
const usage = await runWithProxyContext(proxyInfo?.proxy || null, () =>
167+
getUsageForProvider(connection)
168+
);
169+
170+
if (usage?.quotas) {
171+
setQuotaCache(entry.connectionId, entry.provider, usage.quotas);
172+
}
173+
} catch {
174+
// Refresh failed silently — keep stale entry
175+
}
176+
}
177+
178+
async function backgroundRefreshTick() {
179+
const now = Date.now();
180+
181+
for (const entry of cache.values()) {
182+
const age = now - entry.fetchedAt;
183+
184+
if (entry.exhausted) {
185+
// If resetAt has passed, refetch immediately
186+
if (entry.nextResetAt && new Date(entry.nextResetAt).getTime() <= now) {
187+
refreshEntry(entry);
188+
continue;
189+
}
190+
// Recheck exhausted accounts every 20 minutes
191+
if (age >= EXHAUSTED_REFRESH_MS) {
192+
refreshEntry(entry);
193+
}
194+
} else {
195+
// Refresh active accounts every 5 minutes
196+
if (age >= ACTIVE_TTL_MS) {
197+
refreshEntry(entry);
198+
}
199+
}
200+
}
201+
}
202+
203+
/**
204+
* Start the background refresh timer.
205+
*/
206+
export function startBackgroundRefresh() {
207+
if (refreshTimer) return;
208+
refreshTimer = setInterval(backgroundRefreshTick, REFRESH_INTERVAL_MS);
209+
// Don't prevent process exit
210+
if (refreshTimer && typeof refreshTimer === "object" && "unref" in refreshTimer) {
211+
(refreshTimer as any).unref();
212+
}
213+
}
214+
215+
/**
216+
* Stop the background refresh timer.
217+
*/
218+
export function stopBackgroundRefresh() {
219+
if (refreshTimer) {
220+
clearInterval(refreshTimer);
221+
refreshTimer = null;
222+
}
223+
}
224+
225+
/**
226+
* Get cache stats (for debugging/dashboard).
227+
*/
228+
export function getQuotaCacheStats() {
229+
const entries: Array<{
230+
connectionId: string;
231+
provider: string;
232+
exhausted: boolean;
233+
nextResetAt: string | null;
234+
ageMs: number;
235+
}> = [];
236+
237+
for (const entry of cache.values()) {
238+
entries.push({
239+
connectionId: entry.connectionId.slice(0, 8) + "...",
240+
provider: entry.provider,
241+
exhausted: entry.exhausted,
242+
nextResetAt: entry.nextResetAt,
243+
ageMs: Date.now() - entry.fetchedAt,
244+
});
245+
}
246+
247+
return { total: cache.size, entries };
248+
}

src/instrumentation.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ export async function register() {
3434
const { initApiBridgeServer } = await import("@/lib/apiBridgeServer");
3535
initApiBridgeServer();
3636

37+
// Quota cache: start background refresh for quota-aware account selection
38+
const { startBackgroundRefresh } = await import("@/domain/quotaCache");
39+
startBackgroundRefresh();
40+
3741
// Compliance: Initialize audit_log table + cleanup expired logs
3842
try {
3943
const { initAuditLog, cleanupExpiredLogs } = await import("@/lib/compliance/index");

src/sse/handlers/chat.ts

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ import { sanitizeRequest } from "../../shared/utils/inputSanitizer";
3131

3232
// Pipeline integration — wired modules
3333
import { getCircuitBreaker, CircuitBreakerOpenError } from "../../shared/utils/circuitBreaker";
34-
import { isModelAvailable, setModelUnavailable } from "../../domain/modelAvailability";
34+
import {
35+
isModelAvailable,
36+
setModelUnavailable,
37+
clearModelUnavailability,
38+
} from "../../domain/modelAvailability";
39+
import { getQuotaCache, markAccountExhaustedFrom429 } from "../../domain/quotaCache";
3540
import { RequestTelemetry, recordTelemetry } from "../../shared/utils/requestTelemetry";
3641
import { generateRequestId } from "../../shared/utils/requestId";
3742
import { recordCost } from "../../domain/costRules";
@@ -127,7 +132,10 @@ export async function handleChat(request: any, clientRawRequest: any = null) {
127132
telemetry.startPhase("policy");
128133
const policy = await enforceApiKeyPolicy(request, modelStr);
129134
if (policy.rejection) {
130-
log.warn("POLICY", `API key policy rejected: ${modelStr} (key=${policy.apiKeyInfo?.id || "unknown"})`);
135+
log.warn(
136+
"POLICY",
137+
`API key policy rejected: ${modelStr} (key=${policy.apiKeyInfo?.id || "unknown"})`
138+
);
131139
return policy.rejection;
132140
}
133141
const apiKeyInfo = policy.apiKeyInfo;
@@ -243,6 +251,13 @@ async function handleSingleModelChat(
243251
const credentials = await getProviderCredentials(provider, excludeConnectionId);
244252

245253
if (!credentials || credentials.allRateLimited) {
254+
if (lastStatus === 429 || lastStatus === 503) {
255+
setModelUnavailable(provider, model, 60000, `HTTP ${lastStatus}`);
256+
log.info(
257+
"AVAILABILITY",
258+
`${provider}/${model} marked unavailable — all accounts exhausted (HTTP ${lastStatus})`
259+
);
260+
}
246261
return handleNoCredentials(
247262
credentials,
248263
excludeConnectionId,
@@ -296,22 +311,21 @@ async function handleSingleModelChat(
296311
});
297312

298313
if (result.success) {
314+
if (excludeConnectionId) {
315+
clearModelUnavailability(provider, model);
316+
}
299317
recordCostIfNeeded(apiKeyInfo, result);
300318
if (telemetry) telemetry.startPhase("finalize");
301319
if (telemetry) telemetry.endPhase();
302320
return result.response;
303321
}
304322

305-
// Pipeline: Mark model unavailable on repeated failures
306-
if (result.status === 429 || result.status === 503) {
307-
setModelUnavailable(provider, model, 60000, `HTTP ${result.status}`);
308-
log.info(
309-
"AVAILABILITY",
310-
`${provider}/${model} marked unavailable for 60s (HTTP ${result.status})`
311-
);
323+
// 6. Mark quota-exhausted from 429 if no cached quota data
324+
if (result.status === 429 && !getQuotaCache(credentials.connectionId)) {
325+
markAccountExhaustedFrom429(credentials.connectionId, provider);
312326
}
313327

314-
// 6. Fallback to next account
328+
// 7. Fallback to next account
315329
const { shouldFallback } = await markAccountUnavailable(
316330
credentials.connectionId,
317331
result.status,

0 commit comments

Comments
 (0)