diff --git a/api/src/unraid-api/graph/resolvers/notifications/loadNotificationsFile.test.ts b/api/src/unraid-api/graph/resolvers/notifications/loadNotificationsFile.test.ts index 81f675a217..09b3663e45 100644 --- a/api/src/unraid-api/graph/resolvers/notifications/loadNotificationsFile.test.ts +++ b/api/src/unraid-api/graph/resolvers/notifications/loadNotificationsFile.test.ts @@ -15,16 +15,33 @@ import { } from '@app/unraid-api/graph/resolvers/notifications/notifications.model.js'; import { NotificationsService } from '@app/unraid-api/graph/resolvers/notifications/notifications.service.js'; +const { mockWatch } = vi.hoisted(() => { + const watcher = { + on: vi.fn().mockReturnThis(), + close: vi.fn().mockResolvedValue(undefined), + }; + + return { + mockWatch: vi.fn(() => watcher), + }; +}); + // Mock fs/promises for unit tests vi.mock('fs/promises', async () => { const actual = await vi.importActual('fs/promises'); const mockReadFile = vi.fn(); + const mockStat = vi.fn(actual.stat); return { ...actual, readFile: mockReadFile, + stat: mockStat, }; }); +vi.mock('chokidar', () => ({ + watch: mockWatch, +})); + // Mock getters.dynamix, Logger, and pubsub vi.mock('@app/store/index.js', () => { const testNotificationsDir = join(tmpdir(), 'unraid-api-test-notifications'); @@ -84,13 +101,80 @@ const createNotificationsService = (notificationPath = testNotificationsDir) => describe('NotificationsService - loadNotificationFile (minimal mocks)', () => { let service: NotificationsService; - let mockReadFile: any; + let mockReadFile: typeof import('fs/promises').readFile; + let mockStat: typeof import('fs/promises').stat; beforeEach(async () => { const fsPromises = await import('fs/promises'); - mockReadFile = fsPromises.readFile as any; + mockReadFile = fsPromises.readFile; + mockStat = fsPromises.stat; vi.mocked(mockReadFile).mockClear(); + vi.mocked(mockStat).mockClear(); + mockWatch.mockClear(); + Reflect.set(NotificationsService, 'watcher', null); service = createNotificationsService(); + await Reflect.get(service, 'initialization'); + }); + + afterEach(() => { + Reflect.set(NotificationsService, 'watcher', null); + }); + + it('creates the notifications watcher without replaying existing files', () => { + expect(mockWatch).toHaveBeenCalledWith( + testNotificationsDir, + expect.objectContaining({ + ignoreInitial: true, + }) + ); + }); + + it('replays buffered add events after overview hydration', async () => { + const bufferedPath = `${testNotificationsDir}/unread/buffered.notify`; + const hydratedService = createNotificationsService(); + const processNotificationAdd = vi.fn().mockResolvedValue(undefined); + const handleNotificationAdd = ( + Reflect.get(hydratedService, 'handleNotificationAdd') as (path: string) => Promise + ).bind(hydratedService); + + Reflect.set( + hydratedService, + 'ensureNotificationDirectories', + vi.fn().mockResolvedValue(undefined) + ); + Reflect.set(hydratedService, 'publishOverview', vi.fn().mockResolvedValue(undefined)); + Reflect.set(hydratedService, 'processNotificationAdd', processNotificationAdd); + Reflect.set( + hydratedService, + 'getNotificationsWatcher', + vi.fn().mockImplementation(async () => { + await handleNotificationAdd(bufferedPath); + return { + close: vi.fn().mockResolvedValue(undefined), + on: vi.fn().mockReturnThis(), + }; + }) + ); + Reflect.set( + hydratedService, + 'buildOverviewSnapshot', + vi.fn().mockResolvedValue({ + errorOccurred: false, + overview: { + unread: { alert: 0, info: 0, warning: 0, total: 0 }, + archive: { alert: 0, info: 0, warning: 0, total: 0 }, + }, + seenPaths: new Set(), + }) + ); + + await Reflect.get(hydratedService, 'initializeNotificationsState').call( + hydratedService, + testNotificationsDir, + true + ); + + expect(processNotificationAdd).toHaveBeenCalledWith(bufferedPath); }); it('should load and validate a valid notification file', async () => { @@ -244,12 +328,51 @@ importance=alert`; expect(result.timestamp).toBeUndefined(); // Malformed timestamp results in undefined expect(result.formattedTimestamp).toBe('not-a-timestamp'); // Returns original string when parsing fails }); + + it('limits concurrent notification file reads', async () => { + const fileCount = 96; + const files = Array.from({ length: fileCount }, (_, index) => `/test/path/${index}.notify`); + let activeReads = 0; + let maxConcurrentReads = 0; + + vi.mocked(mockReadFile).mockImplementation(async () => { + activeReads += 1; + maxConcurrentReads = Math.max(maxConcurrentReads, activeReads); + await new Promise((resolve) => setTimeout(resolve, 5)); + activeReads -= 1; + return `timestamp=1609459200 +event=Test Event +subject=Test Subject +description=Test Description +importance=alert`; + }); + + const [notifications] = await Reflect.get(service, 'loadNotificationsFromPaths').call( + service, + files, + {} + ); + + expect(notifications).toHaveLength(fileCount); + expect(maxConcurrentReads).toBeLessThanOrEqual(32); + }); + + it('surfaces stat failures when listing notification files', async () => { + const unreadPath = join(testNotificationsDir, 'unread'); + const filePath = join(unreadPath, 'stat-failure.notify'); + writeFileSync(filePath, 'timestamp=1609459200'); + vi.mocked(mockStat).mockRejectedValueOnce(new Error('stat failed')); + + await expect( + Reflect.get(service, 'listFilesInFolder').call(service, unreadPath) + ).rejects.toThrow(); + }); }); describe('NotificationsService - deleteNotification (integration test)', () => { let service: NotificationsService; - beforeEach(() => { + beforeEach(async () => { // Clean up any existing test directory if (existsSync(testNotificationsDir)) { rmSync(testNotificationsDir, { recursive: true, force: true }); @@ -261,6 +384,7 @@ describe('NotificationsService - deleteNotification (integration test)', () => { mkdirSync(join(testNotificationsDir, 'archive'), { recursive: true }); service = createNotificationsService(); + await Reflect.get(service, 'initialization'); }); afterEach(() => { diff --git a/api/src/unraid-api/graph/resolvers/notifications/notifications.service.spec.ts b/api/src/unraid-api/graph/resolvers/notifications/notifications.service.spec.ts index 260a6dbea2..dcf624a278 100644 --- a/api/src/unraid-api/graph/resolvers/notifications/notifications.service.spec.ts +++ b/api/src/unraid-api/graph/resolvers/notifications/notifications.service.spec.ts @@ -31,6 +31,29 @@ import { validateObject } from '@app/unraid-api/graph/resolvers/validation.utils // defined outside `describe` so it's defined inside the `beforeAll` // needed to mock the dynamix import const basePath = '/tmp/test/notifications'; +const zeroOverview = (): NotificationOverview => ({ + unread: { + alert: 0, + info: 0, + warning: 0, + total: 0, + }, + archive: { + alert: 0, + info: 0, + warning: 0, + total: 0, + }, +}); + +async function disableNotificationsWatcher() { + const watcher = Reflect.get(NotificationsService, 'watcher') as { + close?: () => Promise; + } | null; + await watcher?.close?.(); + Reflect.set(NotificationsService, 'watcher', null); + Reflect.set(NotificationsService, 'overview', zeroOverview()); +} // we run sequentially here because this module's state depends on external, shared systems // rn, it's complicated to make the tests atomic & isolated @@ -62,6 +85,7 @@ describe.sequential('NotificationsService', () => { }).compile(); service = module.get(NotificationsService); // this might need to be a module.resolve instead of get + await disableNotificationsWatcher(); vi.spyOn(service, 'paths').mockImplementation(() => testPaths); await service.deleteAllNotifications(); @@ -69,6 +93,7 @@ describe.sequential('NotificationsService', () => { // make sure each test is isolated (as much as possible) afterEach(async () => { + Reflect.set(NotificationsService, 'overview', zeroOverview()); await service.deleteAllNotifications(); }); @@ -503,6 +528,7 @@ describe.concurrent('NotificationsService legacy script compatibility', () => { }).compile(); service = module.get(NotificationsService); + await disableNotificationsWatcher(); }); it.for([['normal'], ['warning'], ['alert']] as const)( diff --git a/api/src/unraid-api/graph/resolvers/notifications/notifications.service.ts b/api/src/unraid-api/graph/resolvers/notifications/notifications.service.ts index 1a6fa5ffd7..7a32c7b15f 100644 --- a/api/src/unraid-api/graph/resolvers/notifications/notifications.service.ts +++ b/api/src/unraid-api/graph/resolvers/notifications/notifications.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { readdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises'; +import { mkdir, readdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises'; import { basename, join } from 'path'; import type { Stats } from 'fs'; @@ -34,12 +34,16 @@ import { batchProcess, formatDatetime, isFulfilled, isRejected, unraidTimestamp @Injectable() export class NotificationsService { + private static readonly FILE_IO_BATCH_SIZE = 32; private logger = new Logger(NotificationsService.name); private static watcher: FSWatcher | null = null; /** * The path to the notification directory - will be updated if the user changes the notifier path */ private path: string | null = null; + private initialization: Promise | null = null; + private isHydratingOverview = false; + private pendingNotificationAdds = new Set(); private static overview: NotificationOverview = { unread: { @@ -58,7 +62,7 @@ export class NotificationsService { constructor(private readonly configService: ConfigService) { this.path = this.getConfiguredPath(); - void this.getNotificationsWatcher(this.path); + this.initialization = this.initializeNotificationsState(this.path); } private getConfiguredPath() { @@ -77,9 +81,8 @@ export class NotificationsService { const basePath = this.getConfiguredPath(); if (this.path !== basePath) { - // Recreate the watcher with force = true - void this.getNotificationsWatcher(basePath, true); this.path = basePath; + this.initialization = this.initializeNotificationsState(basePath, true); } const makePath = (type: NotificationType) => join(basePath, type.toLowerCase()); @@ -90,6 +93,54 @@ export class NotificationsService { }; } + private initializeNotificationsState(basePath: string, recreate = false) { + const initialize = async () => { + await this.ensureNotificationDirectories(basePath); + this.pendingNotificationAdds.clear(); + this.isHydratingOverview = true; + + try { + await this.getNotificationsWatcher(basePath, recreate); + const { errorOccurred, overview, seenPaths } = await this.buildOverviewSnapshot(); + NotificationsService.overview = overview; + await this.publishOverview(); + + const pendingAdds = [...this.pendingNotificationAdds].filter( + (path) => !seenPaths.has(path) + ); + this.pendingNotificationAdds.clear(); + + for (const path of pendingAdds) { + await this.processNotificationAdd(path); + } + + if (errorOccurred) { + this.logger.error( + '[initializeNotificationsState] Failed to fully hydrate notification overview' + ); + } + } finally { + this.isHydratingOverview = false; + this.pendingNotificationAdds.clear(); + } + }; + + const previousInitialization = this.initialization ?? Promise.resolve(); + return previousInitialization.catch(() => undefined).then(initialize); + } + + private async ensureNotificationDirectories(basePath: string) { + await Promise.all([ + mkdir(basePath, { recursive: true }), + mkdir(join(basePath, NotificationType.UNREAD.toLowerCase()), { + recursive: true, + }), + mkdir(join(basePath, NotificationType.ARCHIVE.toLowerCase()), { + recursive: true, + }), + ]); + } + /**------------------------------------------------------------------------ * Subscription Events * @@ -103,22 +154,47 @@ export class NotificationsService { } await NotificationsService.watcher?.close().catch((e) => this.logger.error(e)); - NotificationsService.watcher = watch(basePath, { usePolling: CHOKIDAR_USEPOLLING }).on( - 'add', - (path) => { - void this.handleNotificationAdd(path).catch((e) => this.logger.error(e)); - } - ); + NotificationsService.watcher = watch(basePath, { + usePolling: CHOKIDAR_USEPOLLING, + ignoreInitial: true, + }).on('add', (path) => { + void this.handleNotificationAdd(path).catch((e) => this.logger.error(e)); + }); return NotificationsService.watcher; } + private isMissingFileError(error: unknown): error is NodeJS.ErrnoException { + return error instanceof Error && 'code' in error && error.code === 'ENOENT'; + } + private async handleNotificationAdd(path: string) { + if (this.isHydratingOverview) { + this.pendingNotificationAdds.add(path); + return; + } + + await this.processNotificationAdd(path); + } + + private async processNotificationAdd(path: string) { // The path looks like /{notification base path}/{type}/{notification id} const type = path.includes('/unread/') ? NotificationType.UNREAD : NotificationType.ARCHIVE; // this.logger.debug(`Adding ${type} Notification: ${path}`); - const notification = await this.loadNotificationFile(path, NotificationType[type]); + let notification: Notification; + try { + notification = await this.loadNotificationFile(path, NotificationType[type]); + } catch (error) { + if (this.isMissingFileError(error)) { + this.logger.debug( + `[handleNotificationAdd] Notification disappeared before load: ${path}` + ); + return; + } + throw error; + } + this.increment(notification.importance, NotificationsService.overview[type.toLowerCase()]); if (type === NotificationType.UNREAD) { @@ -173,6 +249,13 @@ export class NotificationsService { } public async recalculateOverview() { + const { errorOccurred, overview } = await this.buildOverviewSnapshot(); + NotificationsService.overview = overview; + await this.publishOverview(); + return { error: errorOccurred, overview: this.getOverview() }; + } + + private async buildOverviewSnapshot() { const overview: NotificationOverview = { unread: { alert: 0, @@ -187,6 +270,7 @@ export class NotificationsService { total: 0, }, }; + const seenPaths = new Set(); // todo - refactor this to be more memory efficient // i.e. by using a lazy generator vs the current eager implementation @@ -194,6 +278,7 @@ export class NotificationsService { // recalculates stats for a particular notification type const recalculate = async (type: NotificationType) => { const ids = await this.listFilesInFolder(this.paths()[type]); + ids.forEach((id) => seenPaths.add(id)); const [notifications] = await this.loadNotificationsFromPaths(ids, {}); notifications.forEach((n) => this.increment(n.importance, overview[type.toLowerCase()])); }; @@ -207,9 +292,7 @@ export class NotificationsService { results.errors.forEach((e) => this.logger.error('[recalculateOverview] ' + e)); } - NotificationsService.overview = overview; - void this.publishOverview(); - return { error: results.errorOccurred, overview: this.getOverview() }; + return { errorOccurred: results.errorOccurred, overview, seenPaths }; } /**------------------------------------------------------------------------ @@ -674,15 +757,24 @@ export class NotificationsService { sortFn: SortFn = (fileA, fileB) => fileB.birthtimeMs - fileA.birthtimeMs // latest first ): Promise { const contents = narrowContent(await readdir(folderPath)); - const contentStats = await Promise.all( - contents.map(async (content) => { - // pre-map each file's stats to avoid excess calls during sorting - const path = join(folderPath, content); - const stats = await stat(path); - return { path, stats }; - }) - ); + const contentStats = await this.settleInBatches(contents, async (content) => { + const path = join(folderPath, content); + const stats = await stat(path); + return { path, stats }; + }); + const statFailure = contentStats + .filter(isRejected) + .find((result) => !this.isMissingFileError(result.reason)); + + if (statFailure) { + throw statFailure.reason; + } + return contentStats + .filter((result): result is PromiseFulfilledResult<{ path: string; stats: Stats }> => + isFulfilled(result) + ) + .map((result) => result.value) .sort((fileA, fileB) => sortFn(fileA.stats, fileB.stats)) .map(({ path }) => path); } @@ -705,10 +797,9 @@ export class NotificationsService { ): Promise<[Notification[], unknown[]]> { const { importance, type, offset = 0, limit = files.length } = filters; - const fileReads = files - .slice(offset, limit + offset) - .map((file) => this.loadNotificationFile(file, type ?? NotificationType.UNREAD)); - const results = await Promise.allSettled(fileReads); + const results = await this.settleInBatches(files.slice(offset, limit + offset), (file) => + this.loadNotificationFile(file, type ?? NotificationType.UNREAD) + ); // if the filter is defined & truthy, tests if the actual value matches the filter const passesFilter = (actual: T, filter?: unknown) => !filter || actual === filter; @@ -727,6 +818,21 @@ export class NotificationsService { ]; } + private async settleInBatches( + items: Input[], + action: (item: Input) => Promise, + batchSize = NotificationsService.FILE_IO_BATCH_SIZE + ): Promise[]> { + const results: PromiseSettledResult[] = []; + + for (let index = 0; index < items.length; index += batchSize) { + const batch = items.slice(index, index + batchSize); + results.push(...(await Promise.allSettled(batch.map(action)))); + } + + return results; + } + /** * Loads a notification file from disk, parses it to a Notification object, and * validates the object against the NotificationSchema.