Skip to content

Commit c83a057

Browse files
committed
refactor(watcher): make auth file events fully incremental
1 parent 1ae994b commit c83a057

File tree

5 files changed

+245
-176
lines changed

5 files changed

+245
-176
lines changed

internal/watcher/clients.go

Lines changed: 89 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
1818
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
1919
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff"
20+
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer"
2021
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
2122
log "github.com/sirupsen/logrus"
2223
)
@@ -75,6 +76,7 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
7576

7677
w.lastAuthHashes = make(map[string]string)
7778
w.lastAuthContents = make(map[string]*coreauth.Auth)
79+
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
7880
if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil {
7981
log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir)
8082
} else if resolvedAuthDir != "" {
@@ -92,6 +94,24 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
9294
if errParse := json.Unmarshal(data, &auth); errParse == nil {
9395
w.lastAuthContents[normalizedPath] = &auth
9496
}
97+
ctx := &synthesizer.SynthesisContext{
98+
Config: cfg,
99+
AuthDir: resolvedAuthDir,
100+
Now: time.Now(),
101+
IDGenerator: synthesizer.NewStableIDGenerator(),
102+
}
103+
if generated := synthesizer.SynthesizeAuthFile(ctx, path, data); len(generated) > 0 {
104+
pathAuths := make(map[string]*coreauth.Auth, len(generated))
105+
for _, a := range generated {
106+
if a == nil || strings.TrimSpace(a.ID) == "" {
107+
continue
108+
}
109+
pathAuths[a.ID] = a.Clone()
110+
}
111+
if len(pathAuths) > 0 {
112+
w.fileAuthsByPath[normalizedPath] = pathAuths
113+
}
114+
}
95115
}
96116
}
97117
return nil
@@ -143,13 +163,14 @@ func (w *Watcher) addOrUpdateClient(path string) {
143163
}
144164

145165
w.clientsMutex.Lock()
146-
147-
cfg := w.config
148-
if cfg == nil {
166+
if w.config == nil {
149167
log.Error("config is nil, cannot add or update client")
150168
w.clientsMutex.Unlock()
151169
return
152170
}
171+
if w.fileAuthsByPath == nil {
172+
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
173+
}
153174
if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash {
154175
log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path))
155176
w.clientsMutex.Unlock()
@@ -177,34 +198,85 @@ func (w *Watcher) addOrUpdateClient(path string) {
177198
}
178199
w.lastAuthContents[normalized] = &newAuth
179200

180-
w.clientsMutex.Unlock() // Unlock before the callback
181-
182-
w.refreshAuthState(false)
201+
oldByID := make(map[string]*coreauth.Auth)
202+
if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 {
203+
for id, a := range existing {
204+
oldByID[id] = a
205+
}
206+
}
183207

184-
if w.reloadCallback != nil {
185-
log.Debugf("triggering server update callback after add/update")
186-
w.reloadCallback(cfg)
208+
// Build synthesized auth entries for this single file only.
209+
sctx := &synthesizer.SynthesisContext{
210+
Config: w.config,
211+
AuthDir: w.authDir,
212+
Now: time.Now(),
213+
IDGenerator: synthesizer.NewStableIDGenerator(),
214+
}
215+
generated := synthesizer.SynthesizeAuthFile(sctx, path, data)
216+
newByID := make(map[string]*coreauth.Auth)
217+
for _, a := range generated {
218+
if a == nil || strings.TrimSpace(a.ID) == "" {
219+
continue
220+
}
221+
newByID[a.ID] = a.Clone()
222+
}
223+
if len(newByID) > 0 {
224+
w.fileAuthsByPath[normalized] = newByID
225+
} else {
226+
delete(w.fileAuthsByPath, normalized)
187227
}
228+
updates := w.computePerPathUpdatesLocked(oldByID, newByID)
229+
w.clientsMutex.Unlock()
230+
188231
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
232+
w.dispatchAuthUpdates(updates)
189233
}
190234

191235
func (w *Watcher) removeClient(path string) {
192236
normalized := w.normalizeAuthPath(path)
193237
w.clientsMutex.Lock()
194-
195-
cfg := w.config
238+
oldByID := make(map[string]*coreauth.Auth)
239+
if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 {
240+
for id, a := range existing {
241+
oldByID[id] = a
242+
}
243+
}
196244
delete(w.lastAuthHashes, normalized)
197245
delete(w.lastAuthContents, normalized)
246+
delete(w.fileAuthsByPath, normalized)
198247

199-
w.clientsMutex.Unlock() // Release the lock before the callback
248+
updates := w.computePerPathUpdatesLocked(oldByID, map[string]*coreauth.Auth{})
249+
w.clientsMutex.Unlock()
200250

201-
w.refreshAuthState(false)
251+
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
252+
w.dispatchAuthUpdates(updates)
253+
}
202254

203-
if w.reloadCallback != nil {
204-
log.Debugf("triggering server update callback after removal")
205-
w.reloadCallback(cfg)
255+
func (w *Watcher) computePerPathUpdatesLocked(oldByID, newByID map[string]*coreauth.Auth) []AuthUpdate {
256+
if w.currentAuths == nil {
257+
w.currentAuths = make(map[string]*coreauth.Auth)
206258
}
207-
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
259+
updates := make([]AuthUpdate, 0, len(oldByID)+len(newByID))
260+
for id, newAuth := range newByID {
261+
existing, ok := w.currentAuths[id]
262+
if !ok {
263+
w.currentAuths[id] = newAuth.Clone()
264+
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: newAuth.Clone()})
265+
continue
266+
}
267+
if !authEqual(existing, newAuth) {
268+
w.currentAuths[id] = newAuth.Clone()
269+
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: newAuth.Clone()})
270+
}
271+
}
272+
for id := range oldByID {
273+
if _, stillExists := newByID[id]; stillExists {
274+
continue
275+
}
276+
delete(w.currentAuths, id)
277+
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
278+
}
279+
return updates
208280
}
209281

210282
func (w *Watcher) loadFileClients(cfg *config.Config) int {

internal/watcher/dispatcher.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
1515
)
1616

17+
var snapshotCoreAuthsFunc = snapshotCoreAuths
18+
1719
func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) {
1820
w.clientsMutex.Lock()
1921
defer w.clientsMutex.Unlock()
@@ -76,7 +78,11 @@ func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool {
7678
}
7779

7880
func (w *Watcher) refreshAuthState(force bool) {
79-
auths := w.SnapshotCoreAuths()
81+
w.clientsMutex.RLock()
82+
cfg := w.config
83+
authDir := w.authDir
84+
w.clientsMutex.RUnlock()
85+
auths := snapshotCoreAuthsFunc(cfg, authDir)
8086
w.clientsMutex.Lock()
8187
if len(w.runtimeAuths) > 0 {
8288
for _, a := range w.runtimeAuths {

internal/watcher/synthesizer/file.go

Lines changed: 94 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e
3535
return out, nil
3636
}
3737

38-
now := ctx.Now
39-
cfg := ctx.Config
40-
4138
for _, e := range entries {
4239
if e.IsDir() {
4340
continue
@@ -51,93 +48,115 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e
5148
if errRead != nil || len(data) == 0 {
5249
continue
5350
}
54-
var metadata map[string]any
55-
if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil {
56-
continue
57-
}
58-
t, _ := metadata["type"].(string)
59-
if t == "" {
51+
auths := synthesizeFileAuths(ctx, full, data)
52+
if len(auths) == 0 {
6053
continue
6154
}
62-
provider := strings.ToLower(t)
63-
if provider == "gemini" {
64-
provider = "gemini-cli"
65-
}
66-
label := provider
67-
if email, _ := metadata["email"].(string); email != "" {
68-
label = email
69-
}
70-
// Use relative path under authDir as ID to stay consistent with the file-based token store
71-
id := full
72-
if rel, errRel := filepath.Rel(ctx.AuthDir, full); errRel == nil && rel != "" {
55+
out = append(out, auths...)
56+
}
57+
return out, nil
58+
}
59+
60+
// SynthesizeAuthFile generates Auth entries for one auth JSON file payload.
61+
// It shares exactly the same mapping behavior as FileSynthesizer.Synthesize.
62+
func SynthesizeAuthFile(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth {
63+
return synthesizeFileAuths(ctx, fullPath, data)
64+
}
65+
66+
func synthesizeFileAuths(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth {
67+
if ctx == nil || len(data) == 0 {
68+
return nil
69+
}
70+
now := ctx.Now
71+
cfg := ctx.Config
72+
var metadata map[string]any
73+
if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil {
74+
return nil
75+
}
76+
t, _ := metadata["type"].(string)
77+
if t == "" {
78+
return nil
79+
}
80+
provider := strings.ToLower(t)
81+
if provider == "gemini" {
82+
provider = "gemini-cli"
83+
}
84+
label := provider
85+
if email, _ := metadata["email"].(string); email != "" {
86+
label = email
87+
}
88+
// Use relative path under authDir as ID to stay consistent with the file-based token store.
89+
id := fullPath
90+
if strings.TrimSpace(ctx.AuthDir) != "" {
91+
if rel, errRel := filepath.Rel(ctx.AuthDir, fullPath); errRel == nil && rel != "" {
7392
id = rel
7493
}
94+
}
7595

76-
proxyURL := ""
77-
if p, ok := metadata["proxy_url"].(string); ok {
78-
proxyURL = p
79-
}
96+
proxyURL := ""
97+
if p, ok := metadata["proxy_url"].(string); ok {
98+
proxyURL = p
99+
}
80100

81-
prefix := ""
82-
if rawPrefix, ok := metadata["prefix"].(string); ok {
83-
trimmed := strings.TrimSpace(rawPrefix)
84-
trimmed = strings.Trim(trimmed, "/")
85-
if trimmed != "" && !strings.Contains(trimmed, "/") {
86-
prefix = trimmed
87-
}
101+
prefix := ""
102+
if rawPrefix, ok := metadata["prefix"].(string); ok {
103+
trimmed := strings.TrimSpace(rawPrefix)
104+
trimmed = strings.Trim(trimmed, "/")
105+
if trimmed != "" && !strings.Contains(trimmed, "/") {
106+
prefix = trimmed
88107
}
108+
}
89109

90-
disabled, _ := metadata["disabled"].(bool)
91-
status := coreauth.StatusActive
92-
if disabled {
93-
status = coreauth.StatusDisabled
94-
}
110+
disabled, _ := metadata["disabled"].(bool)
111+
status := coreauth.StatusActive
112+
if disabled {
113+
status = coreauth.StatusDisabled
114+
}
95115

96-
// Read per-account excluded models from the OAuth JSON file
97-
perAccountExcluded := extractExcludedModelsFromMetadata(metadata)
116+
// Read per-account excluded models from the OAuth JSON file.
117+
perAccountExcluded := extractExcludedModelsFromMetadata(metadata)
98118

99-
a := &coreauth.Auth{
100-
ID: id,
101-
Provider: provider,
102-
Label: label,
103-
Prefix: prefix,
104-
Status: status,
105-
Disabled: disabled,
106-
Attributes: map[string]string{
107-
"source": full,
108-
"path": full,
109-
},
110-
ProxyURL: proxyURL,
111-
Metadata: metadata,
112-
CreatedAt: now,
113-
UpdatedAt: now,
114-
}
115-
// Read priority from auth file
116-
if rawPriority, ok := metadata["priority"]; ok {
117-
switch v := rawPriority.(type) {
118-
case float64:
119-
a.Attributes["priority"] = strconv.Itoa(int(v))
120-
case string:
121-
priority := strings.TrimSpace(v)
122-
if _, errAtoi := strconv.Atoi(priority); errAtoi == nil {
123-
a.Attributes["priority"] = priority
124-
}
119+
a := &coreauth.Auth{
120+
ID: id,
121+
Provider: provider,
122+
Label: label,
123+
Prefix: prefix,
124+
Status: status,
125+
Disabled: disabled,
126+
Attributes: map[string]string{
127+
"source": fullPath,
128+
"path": fullPath,
129+
},
130+
ProxyURL: proxyURL,
131+
Metadata: metadata,
132+
CreatedAt: now,
133+
UpdatedAt: now,
134+
}
135+
// Read priority from auth file.
136+
if rawPriority, ok := metadata["priority"]; ok {
137+
switch v := rawPriority.(type) {
138+
case float64:
139+
a.Attributes["priority"] = strconv.Itoa(int(v))
140+
case string:
141+
priority := strings.TrimSpace(v)
142+
if _, errAtoi := strconv.Atoi(priority); errAtoi == nil {
143+
a.Attributes["priority"] = priority
125144
}
126145
}
127-
ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth")
128-
if provider == "gemini-cli" {
129-
if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 {
130-
for _, v := range virtuals {
131-
ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth")
132-
}
133-
out = append(out, a)
134-
out = append(out, virtuals...)
135-
continue
146+
}
147+
ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth")
148+
if provider == "gemini-cli" {
149+
if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 {
150+
for _, v := range virtuals {
151+
ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth")
136152
}
153+
out := make([]*coreauth.Auth, 0, 1+len(virtuals))
154+
out = append(out, a)
155+
out = append(out, virtuals...)
156+
return out
137157
}
138-
out = append(out, a)
139158
}
140-
return out, nil
159+
return []*coreauth.Auth{a}
141160
}
142161

143162
// SynthesizeGeminiVirtualAuths creates virtual Auth entries for multi-project Gemini credentials.

0 commit comments

Comments
 (0)