Skip to content

Commit 54cf0e3

Browse files
edandylyticscodexclaude
authored
Sideloading: App accepts Output File Sets from Executor (#42)
* accept output file set callbacks add the earthbeam callback and job payload fields needed to record executor output file sets. keep the table keyed by uid, harden the callback error handling, and cover the new auth and payload behavior with integration tests. Co-authored-by: Codex <noreply@openai.com> * fix path validation, store output file set path, update AGENTS.md - tighten startsWith check to require exact match or trailing slash, preventing sibling-prefix bypass (e.g. output-evil) - add regression test for sibling-prefix path rejection - add path column to run_output_file_set (migration 030) so PR 6 can reconstruct download paths for subfolder output sets - update AGENTS.md sequence diagram and executor lifecycle to include the output-files callback Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Made-with: Cursor * add unique constraint and duplicate detection for output file sets The PR-4 review flagged that a retried executor POST for the same run_id + path would silently create duplicate output file sets. Consolidate the path column and a UNIQUE(run_id, path) constraint into migration 029 (removing the now-unnecessary 030), add an application-level duplicate check returning 409 Conflict, and cover the scenario with an integration test. Co-authored-by: Claude <noreply@anthropic.com> Made-with: Cursor * catch unique constraint instead of pre-querying, clean up tests - replace findFirst duplicate check with catch on P2002 unique constraint violation to eliminate TOCTOU race and save a query - move jobX setup into the cross-run auth test that uses it - remove redundant sentToOds=false test (just persisting a boolean) - fix test semantics: sideloaded paths use sentToOds=false, ODS-sent paths use sentToOds=true - simplify TIMESTAMP(6) to TIMESTAMP in migration 029 - gitignore docs/, .cursor/skills/, .claude/skills/, .agents/ to prevent accidental commits of in-progress project docs Co-authored-by: Claude <noreply@anthropic.com> Made-with: Cursor * use .catch for output file set error handling Flatten the try/catch into a .catch on the create promise to match the pattern used elsewhere and reduce nesting. Co-authored-by: Claude <noreply@anthropic.com> Made-with: Cursor * require subfolder path, use relative S3 keys, canonicalize trailing slashes Review round 3 fixes: - Reject bare base path in output-files callback (must be a proper subfolder) - Canonicalize trailing slashes before validation and persistence to prevent duplicate-bypassing of the UNIQUE(run_id, path) constraint - Switch outputFilesBasePath from full S3 URI to relative key ({fileBasePath}/output) since the executor sends relative paths, not protocol://bucket URIs - Rename lb-download-dir to roster-download-dir in local executor service - Revert .gitignore scope creep (moved to .git/info/exclude) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * remove outputFilesBasePath from executor payload, validate against fileBasePath The executor owns the output file location — it doesn't need the app to tell it where to write. Removed outputFilesBasePath from the DTO and payload. The output-files callback now validates the reported path is a child of fileBasePath (the job's data directory) instead of a narrower outputFilesBasePath. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * consolidate GET /:runId tests, remove outputFilesBasePath test Move appUrls.outputFiles payload test into the existing GET /:runId describe block (above Descriptor Mappings). Remove the duplicate describe block and the outputFilesBasePath test, which tested a field that was never shipped. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * reject output file set callback when no files found at S3 path If the executor posts a path that has no files in S3, return 400 rather than creating a record with an empty files array. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * type RunOutputFileSet.files as string[] Add prisma-json-types-generator annotation so the files column is typed as string[] instead of generic Json. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * add .nxignore to prevent nx daemon from indexing vite temp files Vite creates transient timestamp files (e.g. vite.config.mts.timestamp-*.mjs) that NX caches in its project graph. When the dev server stops and deletes them, NX fails with "Unable to load" errors requiring nx reset. Co-authored-by: Claude <noreply@anthropic.com> Made-with: Cursor * return { key, name } from listFilesAtPath, simplify callers listFilesAtPath now filters out undefined keys and folder markers internally, and returns objects with both the full S3 key and the filename. This removes duplicate prefix-stripping from both callers and makes the contract explicit. Also adds a test for completeRun saving output files from S3 listing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * remove redundant error logging from output file set catch NestJS's built-in exception handler already logs unhandled errors with full stack traces and returns a generic 500. The manual catch-log-rethrow was duplicating that with less detail. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Codex <noreply@openai.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f15f493 commit 54cf0e3

14 files changed

Lines changed: 341 additions & 36 deletions

File tree

AGENTS.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ sequenceDiagram
111111
Exec->>Exec: earthmover run (transform data)
112112
Exec->>ODS: lightbeam send (load to ODS)
113113
Exec->>S3: Upload output artifacts
114+
Exec->>App: POST /output-files (path + sentToOds → app lists S3, saves run_output_file_set)
114115
115116
Exec->>App: POST /status, /error, /summary, /unmatched-ids
116117
Exec->>App: POST /status {action: done}
@@ -156,7 +157,8 @@ sequenceDiagram
156157
6. **Transform**: `earthmover run` (with encoding detection + retry)
157158
7. **Load**: `lightbeam send` to Ed-Fi ODS
158159
8. **Report**: POST summary, unmatched IDs, errors to app via callback URLs
159-
9. **Done**: POST status `{action: DONE, status: success|failure}`
160+
9. **Output files**: POST output file path + `sentToOds` flag to `/output-files` callback; app validates path, lists S3, saves `run_output_file_set`
161+
10. **Done**: POST status `{action: DONE, status: success|failure}`
160162

161163
### S3 Path Structure
162164

app/.nxignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
n*.timestamp-*

app/api/integration/helpers/setup-and-teardown/tasks/init-app.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ export const initApp = async function () {
3232
getPresignedDownloadUrl: jest
3333
.fn()
3434
.mockImplementation((f) => Promise.resolve(`s3-test-download-url://${f.fullPath}`)),
35-
listFilesAtPath: jest.fn().mockResolvedValue(['test-file-1', 'test-file-2']),
35+
listFilesAtPath: jest.fn().mockResolvedValue([
36+
{ key: 'test-path/test-file-1', name: 'test-file-1' },
37+
{ key: 'test-path/test-file-2', name: 'test-file-2' },
38+
]),
3639
doFilesExist: jest.fn().mockResolvedValue(true),
3740
})
3841
.compile();

app/api/integration/tests/earthbeam-api.spec.ts

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { Run } from '@prisma/client';
88
import { partnerA } from '../fixtures/context-fixtures/partner-fixtures';
99
import { EventEmitterLogService, EVENT_EMITTER_SERVICE } from 'api/src/event-emitter/event-emitter.service';
1010
import { userA } from '../fixtures/user-fixtures';
11+
import { FileService } from 'api/src/files/file.service';
1112

1213
describe('Earthbeam API', () => {
1314
describe('GET /:runId', () => {
@@ -62,6 +63,16 @@ describe('Earthbeam API', () => {
6263
expect(res.status).toBe(403);
6364
});
6465

66+
it('should include appUrls.outputFiles in the job payload', async () => {
67+
const res = await request(app.getHttpServer())
68+
.get(endpointA)
69+
.set('Authorization', `Bearer ${tokenA}`);
70+
71+
expect(res.status).toBe(200);
72+
expect(res.body.appUrls.outputFiles).toBeDefined();
73+
expect(res.body.appUrls.outputFiles).toContain(`/earthbeam/jobs/${runA.id}/output-files`);
74+
});
75+
6576
// TODO: add tests for things other than descriptor mappings
6677
describe('Authenticated requests: Descriptor Mappings', () => {
6778
const testDescriptorTypeA = 'testDescriptorTypeA';
@@ -187,6 +198,7 @@ describe('Earthbeam API', () => {
187198
expect(descriptorNamespaceX).toBeUndefined(); // not included in seed for partner X
188199
});
189200
});
201+
190202
});
191203

192204
describe('POST /:runId/status', () => {
@@ -222,15 +234,44 @@ describe('Earthbeam API', () => {
222234
describe('Complete Run', () => {
223235
// TODO: more tests re: run completion
224236
let eventEmitterMock: jest.SpyInstance;
237+
let fileServiceMock: Record<string, jest.Mock>;
225238

226239
beforeEach(async () => {
227240
eventEmitterMock = jest.spyOn(EventEmitterLogService.prototype, 'emit');
241+
fileServiceMock = app.get(FileService) as unknown as Record<string, jest.Mock>;
228242
});
229243

230244
afterEach(() => {
231245
eventEmitterMock.mockRestore();
232246
});
233247

248+
it('should save output files from S3 listing', async () => {
249+
fileServiceMock.listFilesAtPath.mockResolvedValueOnce([
250+
{ key: 'partner/tenant/2025/1/output/results.jsonl', name: 'results.jsonl' },
251+
{ key: 'partner/tenant/2025/1/output/summary.csv', name: 'summary.csv' },
252+
]);
253+
254+
const res = await request(app.getHttpServer())
255+
.post(endpointA)
256+
.set('Authorization', `Bearer ${tokenA}`)
257+
.send({ action: 'done', status: 'success' });
258+
expect(res.status).toBe(201);
259+
260+
const outputFiles = await prisma.runOutputFile.findMany({
261+
where: { runId: runA.id },
262+
orderBy: { name: 'asc' },
263+
});
264+
expect(outputFiles).toHaveLength(2);
265+
expect(outputFiles[0]).toMatchObject({
266+
name: 'results.jsonl',
267+
path: 'partner/tenant/2025/1/output/results.jsonl',
268+
});
269+
expect(outputFiles[1]).toMatchObject({
270+
name: 'summary.csv',
271+
path: 'partner/tenant/2025/1/output/summary.csv',
272+
});
273+
});
274+
234275
describe('Slack Notifiction', () => {
235276
it('should send a slack notification when the run is complete', async () => {
236277
const res = await request(app.getHttpServer())
@@ -311,4 +352,165 @@ describe('Earthbeam API', () => {
311352
});
312353
});
313354
});
355+
356+
describe('POST /:runId/output-files', () => {
357+
let runA: Run;
358+
let tokenA: string;
359+
let endpointA: string;
360+
let jobA: Awaited<ReturnType<typeof seedJob>>;
361+
let fileBasePath: string;
362+
let fileServiceMock: Record<string, jest.Mock>;
363+
364+
beforeEach(async () => {
365+
const authService = app.get(EarthbeamApiAuthService);
366+
fileServiceMock = app.get(FileService) as unknown as Record<string, jest.Mock>;
367+
368+
jobA = await seedJob({
369+
odsConfig: odsConfigA2425,
370+
bundle: bundleA,
371+
tenant: tenantA,
372+
});
373+
374+
if (!jobA?.runs?.[0]) {
375+
throw new Error('Failed to seed job and run');
376+
}
377+
runA = jobA.runs[0];
378+
tokenA = await authService.createAccessToken({ runId: runA.id });
379+
endpointA = `/earthbeam/jobs/${runA.id}/output-files`;
380+
fileBasePath = jobA.fileBasePath!;
381+
});
382+
383+
it('should reject unauthenticated requests', async () => {
384+
const res = await request(app.getHttpServer()).post(endpointA);
385+
expect(res.status).toBe(401);
386+
});
387+
388+
it('should reject requests if the token does not match the run id', async () => {
389+
const authService = app.get(EarthbeamApiAuthService);
390+
const jobX = await seedJob({
391+
odsConfig: odsConfigX2425,
392+
bundle: bundleX,
393+
tenant: tenantX,
394+
});
395+
if (!jobX?.runs?.[0]) {
396+
throw new Error('Failed to seed job and run');
397+
}
398+
const tokenX = await authService.createAccessToken({ runId: jobX.runs[0].id });
399+
400+
const res = await request(app.getHttpServer())
401+
.post(endpointA)
402+
.set('Authorization', `Bearer ${tokenX}`)
403+
.send({ path: `${fileBasePath}/output/sideloaded`, sentToOds: false });
404+
expect(res.status).toBe(403);
405+
});
406+
407+
it('should reject paths outside the job data directory', async () => {
408+
const res = await request(app.getHttpServer())
409+
.post(endpointA)
410+
.set('Authorization', `Bearer ${tokenA}`)
411+
.send({ path: 'other-partner/other-tenant/other-year/other-job/output/evil', sentToOds: true });
412+
expect(res.status).toBe(400);
413+
});
414+
415+
it('should reject sibling paths that share the fileBasePath prefix', async () => {
416+
const res = await request(app.getHttpServer())
417+
.post(endpointA)
418+
.set('Authorization', `Bearer ${tokenA}`)
419+
.send({ path: `${fileBasePath}-evil`, sentToOds: true });
420+
expect(res.status).toBe(400);
421+
});
422+
423+
it('should reject the bare base path (must be a child)', async () => {
424+
const res = await request(app.getHttpServer())
425+
.post(endpointA)
426+
.set('Authorization', `Bearer ${tokenA}`)
427+
.send({ path: fileBasePath, sentToOds: true });
428+
expect(res.status).toBe(400);
429+
});
430+
431+
it('should reject the bare base path with trailing slash', async () => {
432+
const res = await request(app.getHttpServer())
433+
.post(endpointA)
434+
.set('Authorization', `Bearer ${tokenA}`)
435+
.send({ path: `${fileBasePath}/`, sentToOds: true });
436+
expect(res.status).toBe(400);
437+
});
438+
439+
it('should reject when no files are found at the given path', async () => {
440+
fileServiceMock.listFilesAtPath.mockResolvedValueOnce([]);
441+
442+
const res = await request(app.getHttpServer())
443+
.post(endpointA)
444+
.set('Authorization', `Bearer ${tokenA}`)
445+
.send({ path: `${fileBasePath}/output/empty`, sentToOds: true });
446+
expect(res.status).toBe(400);
447+
});
448+
449+
it('should canonicalize trailing slashes and store the path without them', async () => {
450+
const subfolder = `${jobA.fileBasePath}/output/transformed/`;
451+
fileServiceMock.listFilesAtPath.mockResolvedValueOnce([
452+
{ key: `${subfolder}output1.jsonl`, name: 'output1.jsonl' },
453+
]);
454+
455+
const res = await request(app.getHttpServer())
456+
.post(endpointA)
457+
.set('Authorization', `Bearer ${tokenA}`)
458+
.send({ path: `${fileBasePath}/output/transformed/`, sentToOds: true });
459+
460+
expect(res.status).toBe(201);
461+
462+
const saved = await prisma.runOutputFileSet.findUnique({
463+
where: { uid: res.body.uid },
464+
});
465+
expect(saved!.path).toBe(`${fileBasePath}/output/transformed`);
466+
});
467+
468+
it('should list S3 at the given path and save discovered files', async () => {
469+
const subfolder = `${jobA.fileBasePath}/output/transformed/`;
470+
fileServiceMock.listFilesAtPath.mockResolvedValueOnce([
471+
{ key: `${subfolder}output1.jsonl`, name: 'output1.jsonl' },
472+
{ key: `${subfolder}output2.jsonl`, name: 'output2.jsonl' },
473+
]);
474+
475+
const res = await request(app.getHttpServer())
476+
.post(endpointA)
477+
.set('Authorization', `Bearer ${tokenA}`)
478+
.send({ path: `${fileBasePath}/output/transformed`, sentToOds: true });
479+
480+
expect(res.status).toBe(201);
481+
expect(res.body.uid).toBeDefined();
482+
expect(typeof res.body.uid).toBe('string');
483+
484+
expect(fileServiceMock.listFilesAtPath).toHaveBeenCalledWith(subfolder);
485+
486+
const saved = await prisma.runOutputFileSet.findUnique({
487+
where: { uid: res.body.uid },
488+
});
489+
expect(saved).not.toBeNull();
490+
expect(saved!.runId).toBe(runA.id);
491+
expect(saved!.path).toBe(`${fileBasePath}/output/transformed`);
492+
expect(saved!.files).toEqual(['output1.jsonl', 'output2.jsonl']);
493+
expect(saved!.sentToOds).toBe(true);
494+
});
495+
496+
it('should return 409 on duplicate run_id + path', async () => {
497+
const subfolder = `${jobA.fileBasePath}/output/sideloaded/`;
498+
fileServiceMock.listFilesAtPath.mockResolvedValue([
499+
{ key: `${subfolder}output1.jsonl`, name: 'output1.jsonl' },
500+
]);
501+
502+
const first = await request(app.getHttpServer())
503+
.post(endpointA)
504+
.set('Authorization', `Bearer ${tokenA}`)
505+
.send({ path: `${fileBasePath}/output/sideloaded`, sentToOds: false });
506+
expect(first.status).toBe(201);
507+
508+
const second = await request(app.getHttpServer())
509+
.post(endpointA)
510+
.set('Authorization', `Bearer ${tokenA}`)
511+
.send({ path: `${fileBasePath}/output/sideloaded`, sentToOds: false });
512+
expect(second.status).toBe(409);
513+
});
514+
});
515+
314516
});
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE TABLE public.run_output_file_set (
2+
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3+
run_id INTEGER NOT NULL REFERENCES public.run (id) ON DELETE CASCADE,
4+
files JSONB NOT NULL,
5+
sent_to_ods BOOLEAN NOT NULL DEFAULT true,
6+
path TEXT NOT NULL,
7+
created_on TIMESTAMP NOT NULL DEFAULT now(),
8+
UNIQUE(run_id, path)
9+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS public.run_output_file_set;

app/api/src/database/prisma/schema.prisma

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -265,22 +265,23 @@ model Job {
265265

266266
/// This model or at least one of its fields has comments in the database, and requires an additional setup for migrations: Read more: https://pris.ly/d/database-comments
267267
model Run {
268-
id Int @id @default(autoincrement())
269-
jobId Int @map("job_id")
270-
status RunStatus @default(new)
271-
createdById Int? @map("created_by_id")
272-
createdOn DateTime @default(now()) @map("created_on") @db.Timestamp(6)
273-
modifiedById Int? @map("modified_by_id")
274-
modifiedOn DateTime @default(now()) @map("modified_on") @db.Timestamp(6)
268+
id Int @id @default(autoincrement())
269+
jobId Int @map("job_id")
270+
status RunStatus @default(new)
271+
createdById Int? @map("created_by_id")
272+
createdOn DateTime @default(now()) @map("created_on") @db.Timestamp(6)
273+
modifiedById Int? @map("modified_by_id")
274+
modifiedOn DateTime @default(now()) @map("modified_on") @db.Timestamp(6)
275275
/// [RunSummary]
276-
summary Json? @db.Json
276+
summary Json? @db.Json
277277
/// [UnmatchedStudentsInfo]
278-
unmatchedStudentsInfo Json? @map("unmatched_students_info")
279-
userRunCreatedByIdTouser User? @relation("run_created_by_idTouser", fields: [createdById], references: [id], onUpdate: NoAction)
280-
job Job @relation("jobRun", fields: [jobId], references: [id], onDelete: Cascade, onUpdate: NoAction)
281-
userRunModifiedByIdTouser User? @relation("run_modified_by_idTouser", fields: [modifiedById], references: [id], onUpdate: NoAction)
278+
unmatchedStudentsInfo Json? @map("unmatched_students_info")
279+
userRunCreatedByIdTouser User? @relation("run_created_by_idTouser", fields: [createdById], references: [id], onUpdate: NoAction)
280+
job Job @relation("jobRun", fields: [jobId], references: [id], onDelete: Cascade, onUpdate: NoAction)
281+
userRunModifiedByIdTouser User? @relation("run_modified_by_idTouser", fields: [modifiedById], references: [id], onUpdate: NoAction)
282282
runError RunError[]
283283
runOutputFile RunOutputFile[]
284+
runOutputFileSet RunOutputFileSet[]
284285
runUpdate RunUpdate[]
285286
286287
@@map("run")
@@ -433,6 +434,20 @@ model SchoolYearConfig {
433434
@@map("school_year_config")
434435
}
435436

437+
model RunOutputFileSet {
438+
uid String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
439+
runId Int @map("run_id")
440+
/// [RunOutputFileSetFiles]
441+
files Json
442+
sentToOds Boolean @default(true) @map("sent_to_ods")
443+
path String
444+
createdOn DateTime @default(now()) @map("created_on") @db.Timestamp(6)
445+
run Run @relation(fields: [runId], references: [id], onDelete: Cascade, onUpdate: NoAction)
446+
447+
@@unique([runId, path])
448+
@@map("run_output_file_set")
449+
}
450+
436451
enum OdsAuthResponse {
437452
success
438453
error

0 commit comments

Comments
 (0)