Skip to content

Commit 79af6f2

Browse files
committed
feat(qwen): add rate limiting and quota error handling
- Add 60 requests/minute rate limiting per credential using sliding window - Detect insufficient_quota errors and set cooldown until next day (Beijing time) - Map quota errors (HTTP 403/429) to 429 with retryAfter for conductor integration - Cache Beijing timezone at package level to avoid repeated syscalls - Add redactAuthID function to protect credentials in logs - Extract wrapQwenError helper to consolidate error handling
1 parent 1a0e579 commit 79af6f2

File tree

1 file changed

+176
-9
lines changed

1 file changed

+176
-9
lines changed

internal/runtime/executor/qwen_executor.go

Lines changed: 176 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net/http"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
qwenauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/qwen"
@@ -22,9 +23,151 @@ import (
2223
)
2324

2425
const (
25-
qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)"
26+
qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)"
27+
qwenRateLimitPerMin = 60 // 60 requests per minute per credential
28+
qwenRateLimitWindow = time.Minute // sliding window duration
2629
)
2730

31+
// qwenBeijingLoc caches the Beijing timezone to avoid repeated LoadLocation syscalls.
32+
var qwenBeijingLoc = func() *time.Location {
33+
loc, err := time.LoadLocation("Asia/Shanghai")
34+
if err != nil || loc == nil {
35+
log.Warnf("qwen: failed to load Asia/Shanghai timezone: %v, using fixed UTC+8", err)
36+
return time.FixedZone("CST", 8*3600)
37+
}
38+
return loc
39+
}()
40+
41+
// qwenQuotaCodes is a package-level set of error codes that indicate quota exhaustion.
42+
var qwenQuotaCodes = map[string]struct{}{
43+
"insufficient_quota": {},
44+
"quota_exceeded": {},
45+
}
46+
47+
// qwenRateLimiter tracks request timestamps per credential for rate limiting.
48+
// Qwen has a limit of 60 requests per minute per account.
49+
var qwenRateLimiter = struct {
50+
sync.Mutex
51+
requests map[string][]time.Time // authID -> request timestamps
52+
}{
53+
requests: make(map[string][]time.Time),
54+
}
55+
56+
// redactAuthID returns a redacted version of the auth ID for safe logging.
57+
// Keeps a small prefix/suffix to allow correlation across events.
58+
func redactAuthID(id string) string {
59+
if id == "" {
60+
return ""
61+
}
62+
if len(id) <= 8 {
63+
return id
64+
}
65+
return id[:4] + "..." + id[len(id)-4:]
66+
}
67+
68+
// checkQwenRateLimit checks if the credential has exceeded the rate limit.
69+
// Returns nil if allowed, or a statusErr with retryAfter if rate limited.
70+
func checkQwenRateLimit(authID string) error {
71+
if authID == "" {
72+
// Empty authID should not bypass rate limiting in production
73+
// Use debug level to avoid log spam for certain auth flows
74+
log.Debug("qwen rate limit check: empty authID, skipping rate limit")
75+
return nil
76+
}
77+
78+
now := time.Now()
79+
windowStart := now.Add(-qwenRateLimitWindow)
80+
81+
qwenRateLimiter.Lock()
82+
defer qwenRateLimiter.Unlock()
83+
84+
// Get and filter timestamps within the window
85+
timestamps := qwenRateLimiter.requests[authID]
86+
var validTimestamps []time.Time
87+
for _, ts := range timestamps {
88+
if ts.After(windowStart) {
89+
validTimestamps = append(validTimestamps, ts)
90+
}
91+
}
92+
93+
// Always prune expired entries to prevent memory leak
94+
// Delete empty entries, otherwise update with pruned slice
95+
if len(validTimestamps) == 0 {
96+
delete(qwenRateLimiter.requests, authID)
97+
}
98+
99+
// Check if rate limit exceeded
100+
if len(validTimestamps) >= qwenRateLimitPerMin {
101+
// Calculate when the oldest request will expire
102+
oldestInWindow := validTimestamps[0]
103+
retryAfter := oldestInWindow.Add(qwenRateLimitWindow).Sub(now)
104+
if retryAfter < time.Second {
105+
retryAfter = time.Second
106+
}
107+
retryAfterSec := int(retryAfter.Seconds())
108+
return statusErr{
109+
code: http.StatusTooManyRequests,
110+
msg: fmt.Sprintf(`{"error":{"code":"rate_limit_exceeded","message":"Qwen rate limit: %d requests/minute exceeded, retry after %ds","type":"rate_limit_exceeded"}}`, qwenRateLimitPerMin, retryAfterSec),
111+
retryAfter: &retryAfter,
112+
}
113+
}
114+
115+
// Record this request and update the map with pruned timestamps
116+
validTimestamps = append(validTimestamps, now)
117+
qwenRateLimiter.requests[authID] = validTimestamps
118+
119+
return nil
120+
}
121+
122+
// isQwenQuotaError checks if the error response indicates a quota exceeded error.
123+
// Qwen returns HTTP 403 with error.code="insufficient_quota" when daily quota is exhausted.
124+
func isQwenQuotaError(body []byte) bool {
125+
code := strings.ToLower(gjson.GetBytes(body, "error.code").String())
126+
errType := strings.ToLower(gjson.GetBytes(body, "error.type").String())
127+
128+
// Primary check: exact match on error.code or error.type (most reliable)
129+
if _, ok := qwenQuotaCodes[code]; ok {
130+
return true
131+
}
132+
if _, ok := qwenQuotaCodes[errType]; ok {
133+
return true
134+
}
135+
136+
// Fallback: check message only if code/type don't match (less reliable)
137+
msg := strings.ToLower(gjson.GetBytes(body, "error.message").String())
138+
if strings.Contains(msg, "insufficient_quota") || strings.Contains(msg, "quota exceeded") ||
139+
strings.Contains(msg, "free allocated quota exceeded") {
140+
return true
141+
}
142+
143+
return false
144+
}
145+
146+
// wrapQwenError wraps an HTTP error response, detecting quota errors and mapping them to 429.
147+
// Returns the appropriate status code and retryAfter duration for statusErr.
148+
// Only checks for quota errors when httpCode is 403 or 429 to avoid false positives.
149+
func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, retryAfter *time.Duration) {
150+
errCode = httpCode
151+
// Only check quota errors for expected status codes to avoid false positives
152+
// Qwen returns 403 for quota errors, 429 for rate limits
153+
if (httpCode == http.StatusForbidden || httpCode == http.StatusTooManyRequests) && isQwenQuotaError(body) {
154+
errCode = http.StatusTooManyRequests // Map to 429 to trigger quota logic
155+
cooldown := timeUntilNextDay()
156+
retryAfter = &cooldown
157+
logWithRequestID(ctx).Warnf("qwen quota exceeded (http %d -> %d), cooling down until tomorrow (%v)", httpCode, errCode, cooldown)
158+
}
159+
return errCode, retryAfter
160+
}
161+
162+
// timeUntilNextDay returns duration until midnight Beijing time (UTC+8).
163+
// Qwen's daily quota resets at 00:00 Beijing time.
164+
func timeUntilNextDay() time.Duration {
165+
now := time.Now()
166+
nowLocal := now.In(qwenBeijingLoc)
167+
tomorrow := time.Date(nowLocal.Year(), nowLocal.Month(), nowLocal.Day()+1, 0, 0, 0, 0, qwenBeijingLoc)
168+
return tomorrow.Sub(now)
169+
}
170+
28171
// QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions.
29172
// If access token is unavailable, it falls back to legacy via ClientAdapter.
30173
type QwenExecutor struct {
@@ -67,6 +210,17 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
67210
if opts.Alt == "responses/compact" {
68211
return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
69212
}
213+
214+
// Check rate limit before proceeding
215+
var authID string
216+
if auth != nil {
217+
authID = auth.ID
218+
}
219+
if err := checkQwenRateLimit(authID); err != nil {
220+
logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
221+
return resp, err
222+
}
223+
70224
baseModel := thinking.ParseSuffix(req.Model).ModelName
71225

72226
token, baseURL := qwenCreds(auth)
@@ -102,9 +256,8 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
102256
return resp, err
103257
}
104258
applyQwenHeaders(httpReq, token, false)
105-
var authID, authLabel, authType, authValue string
259+
var authLabel, authType, authValue string
106260
if auth != nil {
107-
authID = auth.ID
108261
authLabel = auth.Label
109262
authType, authValue = auth.AccountInfo()
110263
}
@@ -135,8 +288,10 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
135288
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
136289
b, _ := io.ReadAll(httpResp.Body)
137290
appendAPIResponseChunk(ctx, e.cfg, b)
138-
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
139-
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
291+
292+
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
293+
logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
294+
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
140295
return resp, err
141296
}
142297
data, err := io.ReadAll(httpResp.Body)
@@ -158,6 +313,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
158313
if opts.Alt == "responses/compact" {
159314
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
160315
}
316+
317+
// Check rate limit before proceeding
318+
var authID string
319+
if auth != nil {
320+
authID = auth.ID
321+
}
322+
if err := checkQwenRateLimit(authID); err != nil {
323+
logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
324+
return nil, err
325+
}
326+
161327
baseModel := thinking.ParseSuffix(req.Model).ModelName
162328

163329
token, baseURL := qwenCreds(auth)
@@ -200,9 +366,8 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
200366
return nil, err
201367
}
202368
applyQwenHeaders(httpReq, token, true)
203-
var authID, authLabel, authType, authValue string
369+
var authLabel, authType, authValue string
204370
if auth != nil {
205-
authID = auth.ID
206371
authLabel = auth.Label
207372
authType, authValue = auth.AccountInfo()
208373
}
@@ -228,11 +393,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
228393
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
229394
b, _ := io.ReadAll(httpResp.Body)
230395
appendAPIResponseChunk(ctx, e.cfg, b)
231-
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
396+
397+
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
398+
logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
232399
if errClose := httpResp.Body.Close(); errClose != nil {
233400
log.Errorf("qwen executor: close response body error: %v", errClose)
234401
}
235-
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
402+
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
236403
return nil, err
237404
}
238405
out := make(chan cliproxyexecutor.StreamChunk)

0 commit comments

Comments
 (0)