Skip to content

Latest commit

 

History

History
517 lines (396 loc) · 16.2 KB

File metadata and controls

517 lines (396 loc) · 16.2 KB

最后更新:2026-04-16 | 模块:缓存智能管理

架构决策见 ../adr/003-multi-layer-cache.md,接口定义见 ../CODE_WIKI.md

返回文档索引

TokenRouter 缓存智能管理

1. 架构总览

1.1 两层缓存体系

层级 名称 作用 存储位置
L1 厂商侧 KV Cache 通过前缀共享最大化缓存命中 厂商侧 (DeepSeek / OpenAI / Anthropic 等)
L2 请求去重 相同完整请求的并发复用 内存 map / Redis
L3 元数据缓存 API Key、模型配置、用户配额等 Redis (Hash/String)

2. 项目结构

internal/
├── chunker/
│   └── chunker.go              # 静态分块器(Phase 1)
├── arranger/
│   └── arranger.go             # 排列器:System 合并 / Tool 排序 / History 截断
├── canonicalizer/
│   └── canonicalizer.go        # 序列化规范器:生成字节级确定性输出
├── cacheinject/
│   ├── engine.go               # 缓存策略引擎入口
│   ├── injector.go             # Injector 接口
│   ├── anthropic.go            # Anthropic cache_control 注入器
│   ├── openai.go               # OpenAI 透传策略(自动缓存)
│   └── registry.go             # 注入器注册中心
├── hasher/
│   └── hasher.go               # PrefixHash / FullHash 计算
├── dedup/
│   └── dedup.go                # 非流式请求去重器
├── observer/
│   └── observer.go             # 流量观测(Phase 2 数据飞轮)
└── block/
    └── block.go                # Block 定义

3. 核心接口定义

3.1 CacheInjector(缓存注入器)

package cacheinject

type Engine interface {
    Inject(blocks []block.Block, vendor string) ([]block.Block, error)
}

type Injector interface {
    Name() string
    Supports(vendor string) bool
    Inject(blocks []block.Block) ([]block.Block, error)
}

3.2 Anthropic Injector(最复杂)

Anthropic 对 cache_control 的注入位置和 token 门槛有严格限制:

  • 只能注入在 system 消息和 tool 定义上
  • 标记类型为 {"type": "ephemeral"}
  • 最多 K=4 个标记位(通常 System + 前 3 个 Tool)

MVP v0.1 说明:当前版本只接入 DeepSeek V3.2(OpenAI 兼容协议),无需显式缓存标记。Anthropic 的 cache_control 注入器在接口层已预留,Phase 1.1 接入 Anthropic 时直接实现并注册即可。

package cacheinject

// AnthropicInjector is an MVP stub. It is wired into the registry for
// symmetry, but the MVP only routes deepseek* models. Inject() returns
// an error so any accidental reach surfaces loudly.
// See docs/KNOWN_ISSUES.md ISSUE-007.
type AnthropicInjector struct{}

func (i *AnthropicInjector) Name() string { return "anthropic" }

func (i *AnthropicInjector) Supports(vendor string) bool {
    return vendor == "anthropic"
}

func (i *AnthropicInjector) Inject(blocks []block.Block) ([]block.Block, error) {
    return nil, fmt.Errorf("cacheinject: anthropic injector not implemented yet")
}

Phase 1.1 实现方向:接入 Anthropic 时,将 Inject 改为在 System 消息和 Tool 定义上注入 {"type": "ephemeral"} 标记。

3.3 OpenAI Injector

OpenAI / DeepSeek 采用自动缓存(Automatic Caching),无需显式标记。Injector 直接透传即可。MVP v0.1 的 OpenAIInjector 同时覆盖 DeepSeek。

package cacheinject

type OpenAIInjector struct{}

func (i *OpenAIInjector) Name() string { return "openai" }

func (i *OpenAIInjector) Supports(vendor string) bool {
    return vendor == "openai" || vendor == "deepseek"
}

func (i *OpenAIInjector) Inject(blocks []block.Block) ([]block.Block, error) {
    // 无需显式注入,Canonicalizer 已保证结构一致性
    return blocks, nil
}

3.4 Hasher(哈希计算)

package hasher

func PrefixHash(blocks []block.Block) (string, error) {
    var prefix []block.Block
    for _, b := range blocks {
        if b.Type == block.BlockSystem || b.Type == block.BlockTool {
            prefix = append(prefix, b)
        }
    }
    data, err := canonicalizer.CanonicalJSON(prefix)
    if err != nil {
        return "", fmt.Errorf("hasher: %w", err)
    }
    h := sha256.Sum256(data)
    return hex.EncodeToString(h[:]), nil
}

func FullHash(blocks []block.Block) (string, error) {
    data, err := canonicalizer.CanonicalJSON(blocks)
    if err != nil {
        return "", fmt.Errorf("hasher: %w", err)
    }
    h := sha256.Sum256(data)
    return hex.EncodeToString(h[:]), nil
}

3.5 Dedup(非流式去重)

package dedup

type Deduplicator struct {
    inflight map[string]*InFlightRequest
    mu       sync.RWMutex
    ttl      time.Duration
    enabled  bool
    stopCh   chan struct{}
    stopOnce sync.Once
}

type InFlightRequest struct {
    Done       chan struct{}
    StatusCode int
    Resp       []byte
    Err        error
    completeOnce sync.Once
}

func NewDeduplicator(ttl time.Duration, enabled bool) *Deduplicator

// CheckOrRegister atomically checks or registers an in-flight request.
// Returns (req, found): found=true means duplicate in progress, wait on req.Done.
// found=false means caller owns the request and must call Complete.
func (d *Deduplicator) CheckOrRegister(hash string) (*InFlightRequest, bool)

// Complete is idempotent; the first completion wins and releases waiters.
func (d *Deduplicator) Complete(hash string, statusCode int, resp []byte, err error)
func (d *Deduplicator) Stop()

4. Phase 1:静态缓存注入(MVP)

4.1 静态策略

MVP 阶段不引入动态算法,只做固定的结构标准化 + 缓存标记注入:

  1. Chunker:按 role 切分为 System / Tool / History / Query 四块
  2. Arranger:System 合并、Tool 按 Name 字母序排序、History 截断、Query 后置
  3. Canonicalizer:递归 JSON key 字母序、固定数字精度、紧凑输出
  4. CacheInjector:DeepSeek / OpenAI 透传。Anthropic 注入器在接口层预留
  5. Hasher:计算前缀哈希(System + Tool)和完整哈希
  6. Dedup:非流式相同完整请求挂起复用

4.2 降级条件

失败环节 降级行为
Chunker 失败 透传原始 messages,不做分块
Arranger 失败 透传原始 messages,不做排列
Canonicalizer 失败 用普通 JSON 序列化替代
CacheInjector 失败 透传排列后的 Block,不注入标记
Hasher 失败 继续处理,无共享优化

5. Phase 2 预留:智能缓存管理(Observer / Clusterer)

MVP 阶段虽然不做动态前缀优化,但已预留干净的扩展点。

5.1 扩展点清单

扩展点 Phase 1 行为 Phase 2 行为
Chunker 静态四分块 动态分块:根据 Observer 输出的高频模板调整前缀边界
CacheInjector 静态策略:System + 前 N 个 Tool 标记 动态策略:根据模板选择最优标记组合
Observer 只记录原始日志,不消费 实时统计 (system_signature, tool_set) 频率
Clusterer 不存在 挖掘高频共现子集,生成动态前缀模板池
Template Registry 不存在 管理模板生命周期(warm/stable/retire)

5.2 接口预留

// internal/chunker/chunker.go
type Chunker interface {
    Chunk(env *envelope.Envelope) ([]block.Block, error)
}
// Phase 1: StaticChunker
// Phase 2: DynamicChunker(内部调用 Observer/Clusterer 数据)

// internal/cacheinject/engine.go
type Engine interface {
    Inject(blocks []block.Block, vendor string) ([]block.Block, error)
}
// Phase 1: StaticEngine
// Phase 2: DynamicEngine(接入 Template Registry 和在线打分)

动态前缀算法的完整设计详见 ../research/dynamic-prefix-design.md


6. 缓存效果度量

6.1 关键指标

指标 计算公式 目标值
KV Cache 命中率 cache_read_tokens / total_input_tokens > 70% (MVP), > 85% (成熟期)
成本节省率 1 - (实际成本 / 无优化成本) > 50%
平均延迟降低 (无缓存延迟 - 有缓存延迟) / 无缓存延迟 > 30%
去重复用率 dedup_hits / total_requests > 5% (非流式)

6.2 Prometheus 指标

缓存相关的 Prometheus 指标(定义在 internal/monitor/metrics.go):

var (
    CacheHitsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "tokenrouter_cache_hits_total",
        Help: "Total cache hits",
    }, []string{"model", "provider"})

    CacheMissesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "tokenrouter_cache_misses_total",
        Help: "Total cache misses",
    }, []string{"model", "provider"})

    DedupHitsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "tokenrouter_dedup_hits_total",
        Help: "Total dedup hits",
    }, []string{"model"})

    CostUSDTotal = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "tokenrouter_cost_usd_total",
        Help: "Total cost in USD",
    }, []string{"model"})
)

6.3 成本节省计算

TokenRouter 区分两种节省来源:

节省类型 指标名称 来源 说明
缓存节省 tokenrouter_cache_savings_usd_total 上游提供商 上游缓存读取价格优惠(如 DeepSeek cache_read 价格为正常的 1/10)
去重节省 tokenrouter_dedup_savings_usd_total TokenRouter 并发相同请求只发送一次到上游

6.3.1 缓存节省计算逻辑

位置internal/billing/price_engine.go:30-59

func (e *PriceEngine) CalculateSavings(modelName string, counts TokenCounts) (Savings, error) {
    pricing, err := e.table.Get(modelName)
    
    // 如果 cache_write_price 为 0,使用 prompt_price 作为 fallback
    effectiveCacheWritePrice := pricing.CacheWritePrice
    if effectiveCacheWritePrice == 0 {
        effectiveCacheWritePrice = pricing.PromptPrice
    }
    
    // 未优化成本(假设没有缓存,所有缓存 tokens 都按 cache_write_price 计算)
    unoptimizedCost := (counts.CacheReadTokens + counts.CacheWriteTokens) * effectiveCacheWritePrice
    
    // 实际成本(cache_read 按便宜的价格计算)
    actualCost := counts.CacheWriteTokens * effectiveCacheWritePrice + 
                  counts.CacheReadTokens * pricing.CacheReadPrice
    
    // 节省金额 = 未优化成本 - 实际成本
    savedUSD := unoptimizedCost - actualCost
    
    // 节省率
    savingsRate := savedUSD / unoptimizedCost * 100
    
    return Savings{
        SavedUSD:     savedUSD,
        SavingsRate:  savingsRate,
    }, nil
}

核心公式简化

节省金额 = CacheReadTokens × (CacheWritePrice - CacheReadPrice)

示例(deepseek-v4-flash):

价格类型 每 Token 价格 每百万 Token 价格
cache_write_price 0.00000028 USD 0.28 美元
cache_read_price 0.000000014 USD 0.014 美元 (便宜 10 倍)

假设一次请求:

  • cache_read_tokens: 1000
  • cache_write_tokens: 50
未优化成本 = (1000 + 50) × 0.00000028 = 0.000294 USD
实际成本   = 50 × 0.00000028 + 1000 × 0.000000014 = 0.000028 USD
节省金额   = 0.000294 - 0.000028 = 0.000266 USD
节省率     = 0.000266 / 0.000294 × 100% = 90.48%

重要说明:缓存节省是上游提供商给的优惠,TokenRouter 只是通过结构优化让更多请求能够命中缓存。

6.3.2 去重节省计算逻辑

位置internal/server/handler.go:73-95

// 去重命中时
if found {
    <-ifr.Done
    // ... 返回缓存的响应
    monitor.RecordDedupHit(env.Model)
    if ifr.CostUSD > 0 {
        monitor.RecordDedupSavings(env.Model, ifr.CostUSD)  // 节省金额 = 上游实际成本
    }
    return
}

流程

  1. 请求 1 发送到上游,成本 $0.5
  2. 请求 2 命中去重,等待
  3. 请求 1 完成,成本 $0.5 存入 ifr.CostUSD
  4. 请求 2 被唤醒,记录"去重节省 $0.5"

核心逻辑:去重节省 = 被去重请求的完整成本(因为如果没有去重,这个请求会全额支付给上游)

6.3.3 数据流向

上游响应 (DeepSeek API)
    ↓
outbound.Usage {
    PromptTokens: 100,
    CompletionTokens: 50,
    CacheReadTokens: 1000,
    CacheWriteTokens: 50,
}
    ↓
CalculateSavings() 计算缓存节省
    ↓
model.Request {
    SavedUSD: 0.000266,  // 缓存节省
}
    ↓
monitor.RecordCacheSavings() 记录到 Prometheus
    ↓
Grafana Dashboard 显示:sum(increase(tokenrouter_cache_savings_usd_total[1h]))

6.4 告警规则

告警 条件 级别
缓存命中率骤降 命中率 < 50% 持续 30 分钟 Warning
去重异常 等待超时 > 1% 持续 10 分钟 Warning
供应商不可用 连续 3 次调用失败 Critical

7. 数据存储设计

7.1 PostgreSQL

CREATE TABLE cache_stats (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id         UUID NOT NULL REFERENCES users(id),
    date            DATE NOT NULL,
    total_requests  INTEGER DEFAULT 0,
    cache_hits      INTEGER DEFAULT 0,
    hit_rate        DOUBLE PRECISION DEFAULT 0,
    tokens_saved    INTEGER DEFAULT 0,
    cost_saved      DOUBLE PRECISION DEFAULT 0,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(user_id, date)
);

7.2 Redis 数据结构

Key 模式 类型 用途 TTL
cache:config:{model} Hash 模型缓存策略配置 30min
cache:stats:{user_id}:{date} Hash 用户当日缓存统计 24h
dedup:inflight:{hash} String 正在处理中的请求标记 2min

8. 与 v2 架构流水线的集成

CacheInjector 位于 Canonicalizer 之后、出站适配之前,操作 []block.Block

// 简化版流程(实际实现在 internal/server.ChatPipeline.Handle 中)
func handleRequest(ctx context.Context, w http.ResponseWriter, req *http.Request) error {
    env, _, _ := inboundRegistry.Parse(req)
    blocks, _ := staticChunker.Chunk(env)
    blocks, _ = defaultArranger.Arrange(blocks)

    // 缓存注入
    blocks, _ = cacheEngine.Inject(blocks, vendor)

    // 哈希计算
    prefixHash, _ := hasher.PrefixHash(blocks)
    fullHash, _ := hasher.FullHash(blocks)

    // 去重检查(原子操作)
    ifr, found := deduplicator.CheckOrRegister(fullHash)
    if found {
        <-ifr.Done
        w.WriteHeader(ifr.StatusCode)
        w.Write(ifr.Resp)
        return ifr.Err
    }

    // 出站转发
    body, _ := outboundAdapter.BuildRequest(blocks, env)
    resp, _ := proxy.Forward(ctx, proxy.ForwardConfig{...})

    // 完成去重并返回
    deduplicator.Complete(fullHash, resp.StatusCode, resp.Body, nil)
    w.WriteHeader(resp.StatusCode)
    w.Write(resp.Body)
    return nil
}

9. 配置参考

9.1 环境变量

CACHE_INJECT_ENABLED=true      # Anthropic cache_control 注入
DEDUP_ENABLED=true             # 非流式去重
RATE_LIMIT_ENABLED=true        # 速率限制

9.2 Config 结构

// pkg/config/config.go
type Config struct {
    // ...
    CacheInjectEnabled  bool          // env: CACHE_INJECT_ENABLED, default: true
    DedupEnabled        bool          // env: DEDUP_ENABLED, default: true
    RateLimitEnabled    bool          // env: RATE_LIMIT_ENABLED, default: true
    DedupTTL            time.Duration // env: DEDUP_TTL, default: 2m
}

10. 开发路线图

阶段 时间 内容 目标
Phase 1: MVP 0-3月 静态 Chunker + Arranger + Canonicalizer、Anthropic cache_control、前缀哈希共享、非流式去重、Prometheus 指标 KV Cache 命中率 > 70%,去重复用 > 5%
Phase 2: 完善 3-9月 Observer 数据埋点、Clusterer 模板挖掘、动态分块策略、OpenAI/Gemini 策略完善 命中率 > 80%,成本节省 > 50%
Phase 3: 优化 9-18月 Template Registry 热升级、缓存预热预测、A/B 测试框架、跨用户前缀共享优化 命中率 > 85%