Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/board/STATUS_BOARD.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ afterwards is a JIT kernel, not a rebuild. Plan path:

| D-id | Title | Status | PR / Evidence |
|---|---|---|---|
| D3.1 | Server-side sweep handler + Lance fragment append | **Queued** | target ~200 LOC |
| D3.1 | Server-side sweep handler + Lance fragment append | **In PR** | branch — `sweep_handler` batch mode: enumerates `WireSweepGrid::enumerate()`, validates each via TryFrom(CodecParams) at ingress, returns `WireSweepResponse { results: [WireSweepResult { kernel_hash, stub:true }], cardinality, elapsed_ms }`. SSE streaming + real calibrate/token-agreement per point deferred to D3.1b. Route: `POST /v1/shader/sweep`. |
| D3.2 | Client-side driver + config files | **Queued** | target ~20 LOC + YAML configs |

### Phase 4 — Frontier analysis — Queued
Expand Down
8 changes: 6 additions & 2 deletions crates/cognitive-shader-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ tonic-build = { version = "0.12", optional = true }
default = []
with-engine = ["dep:thinking-engine"]
with-planner = ["dep:lance-graph-planner"]
serve = ["dep:serde", "dep:serde_json", "dep:axum", "dep:tokio", "dep:base64", "dep:bytemuck"]
grpc = ["dep:prost", "dep:tonic", "dep:tonic-build", "dep:tokio"]
# Shared LAB DTOs — `wire.rs` + `auto_detect.rs` + codec kernel scaffolds
# + token_agreement use these regardless of whether the transport is REST
# (serve) or gRPC (grpc). Both features pull this set.
_lab-dtos = ["dep:serde", "dep:serde_json", "dep:base64", "dep:bytemuck"]
serve = ["_lab-dtos", "dep:axum", "dep:tokio"]
grpc = ["_lab-dtos", "dep:prost", "dep:tonic", "dep:tonic-build", "dep:tokio"]

# `lab` — umbrella switch for the single shader-lab binary. Enables every
# endpoint (REST + gRPC), the planner bridge, the thinking-engine bridge,
Expand Down
3 changes: 2 additions & 1 deletion crates/cognitive-shader-driver/src/auto_style.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! If nothing dominates (`max < threshold`), fall back to Deliberate.

use lance_graph_contract::cognitive_shader::StyleSelector;
use crate::bindspace::QUALIA_DIMS;


/// Mapping from qualia shape to a style ordinal (0..11 matches
/// `thinking_engine::cognitive_stack::ThinkingStyle::all()`).
Expand Down Expand Up @@ -104,6 +104,7 @@ pub fn resolve(sel: StyleSelector, qualia_row: &[f32]) -> u8 {
#[cfg(test)]
mod tests {
use super::*;
use crate::bindspace::QUALIA_DIMS;

fn q(vals: &[(usize, f32)]) -> [f32; QUALIA_DIMS] {
let mut out = [0.0f32; QUALIA_DIMS];
Expand Down
6 changes: 3 additions & 3 deletions crates/cognitive-shader-driver/src/codec_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl OrchestrationBridge for CodecResearchBridge {
let req: WireTensorsRequest = serde_json::from_str(args)
.map_err(|e| OrchestrationError::ExecutionFailed(e.to_string()))?;
let r = codec_research::list_tensors(&req)
.map_err(|e| OrchestrationError::ExecutionFailed(e))?;
.map_err(OrchestrationError::ExecutionFailed)?;
step.status = StepStatus::Completed;
step.reasoning = Some(format!(
"tensors total={} cam_pq={} passthrough={} skip={}",
Expand All @@ -58,7 +58,7 @@ impl OrchestrationBridge for CodecResearchBridge {
let req: WireCalibrateRequest = serde_json::from_str(args)
.map_err(|e| OrchestrationError::ExecutionFailed(e.to_string()))?;
let r = codec_research::calibrate_tensor(&req)
.map_err(|e| OrchestrationError::ExecutionFailed(e))?;
.map_err(OrchestrationError::ExecutionFailed)?;
step.status = StepStatus::Completed;
step.confidence = Some(r.icc_3_1 as f64);
step.reasoning = Some(format!(
Expand All @@ -71,7 +71,7 @@ impl OrchestrationBridge for CodecResearchBridge {
let req: WireProbeRequest = serde_json::from_str(args)
.map_err(|e| OrchestrationError::ExecutionFailed(e.to_string()))?;
let r = codec_research::row_count_probe(&req)
.map_err(|e| OrchestrationError::ExecutionFailed(e))?;
.map_err(OrchestrationError::ExecutionFailed)?;
step.status = StepStatus::Completed;
step.reasoning = Some(format!(
"probe tensor={} n_rows={} entries={}",
Expand Down
1 change: 1 addition & 0 deletions crates/cognitive-shader-driver/src/codec_research.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn route_str(r: CodecRoute) -> &'static str {
}
}

#[allow(clippy::type_complexity)]
fn load_tensor_rows(
model_path: &str,
tensor_pattern: &str,
Expand Down
4 changes: 2 additions & 2 deletions crates/cognitive-shader-driver/src/decode_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
//! round-trip, no quantization loss, matches Rule F "serialise once at
//! edge" — the decode/encode IS the edge).
//! - [`ResidualComposer`] — composes two decoders with subtract/add:
//! `decode(enc) = base.decode(enc[..k]) + residual.decode(enc[k..])`
//! `encode(v) = [base.encode(v); residual.encode(v - base.decode(base.encode(v)))]`
//! `decode(enc) = base.decode(enc[..k]) + residual.decode(enc[k..])`
//! `encode(v) = [base.encode(v); residual.encode(v - base.decode(base.encode(v)))]`
//! Depth `d > 1` recurses: the residual field itself is a `ResidualComposer`.

use std::collections::hash_map::DefaultHasher;
Expand Down
6 changes: 3 additions & 3 deletions crates/cognitive-shader-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ fn style_ord_to_inference(ord: u8) -> InferenceType {
// intuitive/deliberate → Revision
// metacognitive → Synthesis
match ord {
1 | 2 | 3 => InferenceType::Deduction,
4 | 5 | 6 => InferenceType::Induction,
7 | 8 | 9 => InferenceType::Abduction,
1..=3 => InferenceType::Deduction,
4..=6 => InferenceType::Induction,
7..=9 => InferenceType::Abduction,
0 | 10 => InferenceType::Revision,
_ => InferenceType::Synthesis,
}
Expand Down
11 changes: 8 additions & 3 deletions crates/cognitive-shader-driver/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl CognitiveShaderService for ShaderGrpcService {
route_filter: if req.route_filter.is_empty() { None } else { Some(req.route_filter) },
};
let r = crate::codec_research::list_tensors(&wire_req)
.map_err(|e| Status::invalid_argument(e))?;
.map_err(Status::invalid_argument)?;
Ok(Response::new(pb::TensorsResponse {
total: r.total as u32,
shown: r.shown as u32,
Expand All @@ -210,14 +210,19 @@ impl CognitiveShaderService for ShaderGrpcService {
let wire_req = crate::wire::WireCalibrateRequest {
model_path: req.model_path,
tensor_name: req.tensor_name,
// D0.1 extension fields — gRPC path uses legacy num_*
// fields only; the richer CodecParams + TensorView path is
// REST-only until the proto schema catches up (D0.3b).
params: None,
tensor_view: None,
num_subspaces: if req.num_subspaces == 0 { 6 } else { req.num_subspaces as usize },
num_centroids: if req.num_centroids == 0 { 256 } else { req.num_centroids as usize },
kmeans_iterations: if req.kmeans_iterations == 0 { 20 } else { req.kmeans_iterations as usize },
max_rows: if req.max_rows == 0 { None } else { Some(req.max_rows as usize) },
icc_samples: if req.icc_samples == 0 { 512 } else { req.icc_samples as usize },
};
let r = crate::codec_research::calibrate_tensor(&wire_req)
.map_err(|e| Status::invalid_argument(e))?;
.map_err(Status::invalid_argument)?;
Ok(Response::new(pb::CalibrateResponse {
tensor_name: r.tensor_name,
dims: r.dims,
Expand Down Expand Up @@ -248,7 +253,7 @@ impl CognitiveShaderService for ShaderGrpcService {
icc_samples: if req.icc_samples == 0 { 512 } else { req.icc_samples as usize },
};
let r = crate::codec_research::row_count_probe(&wire_req)
.map_err(|e| Status::invalid_argument(e))?;
.map_err(Status::invalid_argument)?;
Ok(Response::new(pb::ProbeResponse {
tensor_name: r.tensor_name,
n_rows: r.n_rows as u32,
Expand Down
15 changes: 9 additions & 6 deletions crates/cognitive-shader-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,40 @@ pub mod sigma_rosetta;
// reachable through the canonical bridge.

// Per-op Wire DTOs (REST + protobuf). LAB-ONLY.
#[cfg(feature = "serve")]
// Gated on `any(serve, grpc)` because both transports share the same
// DTOs; gRPC consumers (grpc.rs) and REST consumers (serve.rs) both
// convert to/from the `Wire*` types in wire.rs.
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod wire;

// D0.5 — model architecture auto-detection from config.json.
// CODING_PRACTICES.md gap 1 remediation. LAB-ONLY.
#[cfg(feature = "serve")]
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod auto_detect;

// D1.1 — JIT kernel cache keyed by CodecParams::kernel_signature().
// Structural layer; actual Cranelift IR emission defers to D1.1b. LAB-ONLY.
#[cfg(feature = "serve")]
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod codec_kernel_cache;

// D1.2 — rotation primitives (Identity / Hadamard / OPQ-stub). LAB-ONLY.
// Hadamard is real (in-place butterfly); OPQ is stub pending D1.1b's
// ndarray::hpc::jitson_cranelift::JitEngine adapter + matrix-blob loader.
#[cfg(feature = "serve")]
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod rotation_kernel;

// D1.3 — decode-kernel trait + residual composition.
// Hydration/calibration path (NOT cascade inference — that uses
// p64_bridge::CognitiveShader per cognitive-shader-architecture.md
// line 582). LAB-ONLY.
#[cfg(feature = "serve")]
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod decode_kernel;

// D2.1 — token-agreement harness scaffold (I11 cert gate infra).
// Reference model loader stub + top-k comparator + stub result with
// machine-checkable `stub:true` flag. D2.2 adds real safetensors decode.
// LAB-ONLY.
#[cfg(feature = "serve")]
#[cfg(any(feature = "serve", feature = "grpc"))]
pub mod token_agreement;

// Axum REST server. LAB-ONLY.
Expand Down
82 changes: 80 additions & 2 deletions crates/cognitive-shader-driver/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ use crate::wire::{
WireCalibrateRequest, WireCalibrateResponse, WireCrystal, WireDispatch, WireHealth,
WireIngest, WirePlanRequest, WirePlanResponse, WireProbeRequest, WireProbeResponse,
WireQualia, WireRunbookRequest, WireRunbookResponse, WireRunbookStep,
WireRunbookStepResult, WireStepResult, WireStyleInfo, WireTensorsRequest,
WireTensorsResponse, WireTokenAgreement, WireTokenAgreementResult, WireUnifiedStep,
WireRunbookStepResult, WireStepResult, WireStyleInfo, WireSweepRequest,
WireSweepResponse, WireSweepResult, WireTensorsRequest, WireTensorsResponse,
WireTokenAgreement, WireTokenAgreementResult, WireUnifiedStep,
};
use lance_graph_contract::cam::CodecParams;
use std::path::Path as StdPath;
Expand Down Expand Up @@ -96,6 +97,13 @@ pub fn router(driver: ShaderDriver) -> Router {
// `backend:"stub"` so clients cannot confuse Phase 0 stub output
// for a real measurement (anti-#219 defense, type-level).
.route("/v1/shader/token-agreement", post(token_agreement_handler))
// D3.1 — codec sweep endpoint (batch mode). Client POSTs a
// WireSweepRequest containing a cross-product grid; handler
// enumerates grid, validates each candidate, builds stub results,
// returns WireSweepResponse. SSE streaming + Lance append land in
// D3.1b; this batch path stays for clients that want all results
// in one response without streaming.
.route("/v1/shader/sweep", post(sweep_handler))
// Scheduled runbook: one POST runs a list of steps. Test injection
// lands here — a client script submits its full codec-research
// protocol as a single DTO, the server executes and returns all
Expand Down Expand Up @@ -284,6 +292,76 @@ async fn token_agreement_handler(
.map_err(|e| (StatusCode::BAD_REQUEST, Json(json!({"error": format!("{e}")}))))
}

/// D3.1 — `POST /v1/shader/sweep` handler (batch mode).
///
/// Enumerates the cross-product grid from `WireSweepRequest`, validates
/// each candidate via TryFrom(CodecParams), computes kernel_signature +
/// backend per point, and returns all results in one `WireSweepResponse`.
///
/// Stub: per-point calibrate/token_agreement are `None`; Phase 3 real
/// handler invokes the actual codec_research + token_agreement harness.
/// SSE streaming variant (D3.1b) replaces the batch return with per-point
/// Server-Sent Events.
async fn sweep_handler(
Json(req): Json<WireSweepRequest>,
) -> Result<Json<WireSweepResponse>, (StatusCode, Json<Value>)> {
let start = std::time::Instant::now();

// P1 — reject oversized grids before materialization. A small JSON
// payload with moderately-sized axes can explode into a huge Cartesian
// product; bound it so the endpoint isn't a DoS vector.
const MAX_GRID_CARDINALITY: usize = 10_000;
let cardinality = req.grid.cardinality();
if cardinality > MAX_GRID_CARDINALITY {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sweep grid cardinality {cardinality} exceeds max {MAX_GRID_CARDINALITY}; \
reduce axis dimensions"
)
})),
));
}

let candidates = req.grid.enumerate();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject oversized sweep grids before enumeration

In sweep_handler, the request-controlled grid is fully materialized via req.grid.enumerate() with no cardinality limit, so a small JSON payload containing moderately sized axes can explode into an enormous Cartesian product and exhaust CPU/memory while building results. This creates a straightforward denial-of-service path for the new /v1/shader/sweep endpoint; add a hard upper bound (for example a few hundred/thousand points) and return 400 before calling enumerate().

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed check again


let mut results = Vec::with_capacity(candidates.len());
for (idx, wire_params) in candidates.into_iter().enumerate() {
// Validate each grid point at ingress — surface typed errors early.
let params: CodecParams = wire_params
.clone()
.try_into()
.map_err(|e: lance_graph_contract::cam::CodecParamsError| {
(StatusCode::BAD_REQUEST, Json(json!({
"error": format!("grid point {idx}: invalid CodecParams: {e}")
})))
})?;

results.push(WireSweepResult {
grid_index: idx as u32,
candidate: wire_params,
kernel_hash: params.kernel_signature(),
calibrate: None,
token_agreement: None,
stub: true,
});
}

Ok(Json(WireSweepResponse {
label: req.label,
cardinality: cardinality as u32,
results,
elapsed_ms: start.elapsed().as_millis() as u64,
// P2 — do NOT echo req.log_to_lance into the response when no rows
// were actually written. Clients that treat lance_fragment_path as
// evidence of successful logging would silently skip retries and
// lose experiment results. Set to None until the real Lance append
// writer lands (Phase 3 D3.1b).
lance_fragment_path: None,
}))
}

async fn route_handler(
State(_state): State<AppState>,
Json(wire): Json<WireUnifiedStep>,
Expand Down
8 changes: 3 additions & 5 deletions crates/cognitive-shader-driver/src/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl AlignedBytes {
}

pub fn is_aligned_64(&self) -> bool {
(self.ptr as usize) % 64 == 0
(self.ptr as usize).is_multiple_of(64)
}

pub fn len(&self) -> usize { self.len }
Expand Down Expand Up @@ -933,17 +933,15 @@ fn named_to_ordinal(s: &str) -> u8 {
/// Reference baseline for token-agreement comparison. Extensible enum —
/// `Passthrough` is the only variant today; future baselines (half-precision
/// reference, previous codec generation) plug in as variants.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum WireBaseline {
/// Passthrough = untouched weights, F32 decode. The canonical
/// reference every codec candidate is measured against.
#[default]
Passthrough,
}

impl Default for WireBaseline {
fn default() -> Self { Self::Passthrough }
}

/// `POST /v1/shader/token-agreement` request.
///
Expand Down
Loading