From 7da349fa8cbb7a1921c1c698a2b78a15ce353528 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 12:27:19 +0200 Subject: [PATCH 01/26] fix(QueueMappers): Ensure the queues are FIFO, prioritize Signed-off-by: Marcel Klehr --- lib/Db/FsEventMapper.php | 1 + lib/Db/QueueActionMapper.php | 1 + lib/Db/QueueContentItemMapper.php | 1 + lib/Db/QueueMapper.php | 10 ++++++++-- lib/Service/QueueService.php | 7 ++++++- 5 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/Db/FsEventMapper.php b/lib/Db/FsEventMapper.php index 6d9c105f..97d51789 100644 --- a/lib/Db/FsEventMapper.php +++ b/lib/Db/FsEventMapper.php @@ -36,6 +36,7 @@ public function getFromQueue(int $limit): array { $qb = $this->db->getQueryBuilder(); $qb->select(FsEvent::$columns) ->from($this->getTableName()) + ->orderBy('id', 'ASC') ->setMaxResults($limit); return $this->findEntities($qb); diff --git a/lib/Db/QueueActionMapper.php b/lib/Db/QueueActionMapper.php index e70a90a7..84ba46d7 100644 --- a/lib/Db/QueueActionMapper.php +++ b/lib/Db/QueueActionMapper.php @@ -35,6 +35,7 @@ public function getFromQueue(int $limit): array { $qb = $this->db->getQueryBuilder(); $qb->select(QueueAction::$columns) ->from($this->getTableName()) + ->orderBy('id', 'ASC') ->setMaxResults($limit); return $this->findEntities($qb); diff --git a/lib/Db/QueueContentItemMapper.php b/lib/Db/QueueContentItemMapper.php index 0d0e8e55..59d01378 100644 --- a/lib/Db/QueueContentItemMapper.php +++ b/lib/Db/QueueContentItemMapper.php @@ -35,6 +35,7 @@ public function getFromQueue(int $limit): array { $qb = $this->db->getQueryBuilder(); $qb->select(QueueContentItem::$columns) ->from($this->getTableName()) + ->orderBy('id', 'ASC') ->setMaxResults($limit); return $this->findEntities($qb); diff --git a/lib/Db/QueueMapper.php b/lib/Db/QueueMapper.php index f4ca332e..bf73b184 100644 --- a/lib/Db/QueueMapper.php +++ b/lib/Db/QueueMapper.php @@ -32,11 +32,13 @@ public function __construct(IDBConnection $db) { /** * @param int $storageId + * @param int $rootId * @param int $n + * @param bool $onlyNewFiles * @return list - * @throws \OCP\DB\Exception + * @throws Exception */ - public function getFromQueue(int $storageId, int $rootId, int $n) : array { + public function getFromQueue(int $storageId, int $rootId, int $n, bool $onlyNewFiles = false) : array { $qb = $this->db->getQueryBuilder(); $qb->select(QueueFile::$columns) ->from($this->getTableName()) @@ -45,6 +47,10 @@ public function getFromQueue(int $storageId, int $rootId, int $n) : array { ->setMaxResults($n) ->orderBy('id', 'ASC'); + if ($onlyNewFiles) { + $qb->andWhere($qb->expr()->eq('update', $qb->createPositionalParameter(false, IQueryBuilder::PARAM_BOOL))); + } + return $this->findEntities($qb); } diff --git a/lib/Service/QueueService.php b/lib/Service/QueueService.php index e2b9b605..640336a2 100644 --- a/lib/Service/QueueService.php +++ b/lib/Service/QueueService.php @@ -59,7 +59,12 @@ public function scheduleJob(QueueFile $file): void { * @throws \OCP\DB\Exception */ public function getFromQueue(int $storageId, int $rootId, int $batchSize): array { - return $this->queueMapper->getFromQueue($storageId, $rootId, $batchSize); + $nonUpdates = $this->queueMapper->getFromQueue($storageId, $rootId, $batchSize, true); + if (empty($nonUpdates)) { + return $this->queueMapper->getFromQueue($storageId, $rootId, $batchSize, false); + } + + return $nonUpdates; } public function existsQueueFileId(int $fileId): bool { From 2f1b7ddb8cae59b0e0288f92b9b3c52c8145f11f Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 12:43:23 +0200 Subject: [PATCH 02/26] fix: improve error handling Signed-off-by: Marcel Klehr # Conflicts: # lib/Public/ContentManager.php # lib/Service/FsEventService.php --- lib/Db/QueueMapper.php | 3 ++ lib/Listener/FileListener.php | 2 +- lib/Public/ContentManager.php | 91 ++++++++++++++++++++++---------- lib/Service/ActionScheduler.php | 10 +++- lib/Service/FsEventScheduler.php | 8 ++- lib/Service/FsEventService.php | 21 +++++--- lib/Service/QueueService.php | 4 ++ 7 files changed, 100 insertions(+), 39 deletions(-) diff --git a/lib/Db/QueueMapper.php b/lib/Db/QueueMapper.php index bf73b184..4f0a8382 100644 --- a/lib/Db/QueueMapper.php +++ b/lib/Db/QueueMapper.php @@ -113,6 +113,9 @@ public function insertIntoQueue(QueueFile $file) : QueueFile { return $file; } + /** + * @throws Exception + */ public function clearQueue(): void { $qb = $this->db->getQueryBuilder(); $qb->delete($this->getTableName())->executeStatement(); diff --git a/lib/Listener/FileListener.php b/lib/Listener/FileListener.php index d77e1b48..d3a0d552 100644 --- a/lib/Listener/FileListener.php +++ b/lib/Listener/FileListener.php @@ -133,7 +133,7 @@ public function handle(Event $event): void { $this->fsEventScheduler->onAccessUpdateDecl($rootId); } } catch (InvalidPathException|Exception|NotFoundException $e) { - $this->logger->warning($e->getMessage(), ['exception' => $e]); + $this->logger->warning('Error in fs event listener: ' . $e->getMessage(), ['exception' => $e]); } } } diff --git a/lib/Public/ContentManager.php b/lib/Public/ContentManager.php index a70ee432..ebff3f4a 100644 --- a/lib/Public/ContentManager.php +++ b/lib/Public/ContentManager.php @@ -16,6 +16,7 @@ use OCA\ContextChat\Service\ActionScheduler; use OCA\ContextChat\Service\ProviderConfigService; use OCP\BackgroundJob\IJobList; +use OCP\DB\Exception; use OCP\EventDispatcher\IEventDispatcher; use OCP\Server; use Psr\Container\ContainerExceptionInterface; @@ -100,7 +101,11 @@ public function submitContent(string $appId, array $items): void { $dbItem->setLastModified($item->lastModified); $dbItem->setUsers(implode(',', $item->users)); - $this->mapper->insert($dbItem); + try { + $this->mapper->insert($dbItem); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } if (!$this->jobList->has(SubmitContentJob::class, null)) { @@ -122,11 +127,15 @@ public function submitContent(string $appId, array $items): void { public function removeContentForUsers(string $appId, string $providerId, string $itemId, array $users): void { $this->collectAllContentProviders(); - $this->actionService->updateAccess( - UpdateAccessOp::DENY, - $users, - ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), - ); + try { + $this->actionService->updateAccess( + UpdateAccessOp::DENY, + $users, + ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), + ); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -142,11 +151,15 @@ public function removeContentForUsers(string $appId, string $providerId, string */ public function removeAllContentForUsers(string $appId, string $providerId, array $users): void { $this->collectAllContentProviders(); - $this->actionService->updateAccessProvider( - UpdateAccessOp::DENY, - $users, - ProviderConfigService::getConfigKey($appId, $providerId), - ); + try { + $this->actionService->updateAccessProvider( + UpdateAccessOp::DENY, + $users, + ProviderConfigService::getConfigKey($appId, $providerId), + ); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -166,11 +179,15 @@ public function removeAllContentForUsers(string $appId, string $providerId, arra public function updateAccess(string $appId, string $providerId, string $itemId, string $op, array $userIds): void { $this->collectAllContentProviders(); - $this->actionService->updateAccess( - $op, - $userIds, - ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), - ); + try { + $this->actionService->updateAccess( + $op, + $userIds, + ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), + ); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -187,11 +204,15 @@ public function updateAccess(string $appId, string $providerId, string $itemId, public function updateAccessProvider(string $appId, string $providerId, string $op, array $userIds): void { $this->collectAllContentProviders(); - $this->actionService->updateAccessProvider( - $op, - $userIds, - ProviderConfigService::getConfigKey($appId, $providerId), - ); + try { + $this->actionService->updateAccessProvider( + $op, + $userIds, + ProviderConfigService::getConfigKey($appId, $providerId), + ); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -209,10 +230,14 @@ public function updateAccessProvider(string $appId, string $providerId, string $ public function updateAccessDeclarative(string $appId, string $providerId, string $itemId, array $userIds): void { $this->collectAllContentProviders(); - $this->actionService->updateAccessDeclSource( - $userIds, - ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), - ); + try { + $this->actionService->updateAccessDeclSource( + $userIds, + ProviderConfigService::getSourceId($itemId, ProviderConfigService::getConfigKey($appId, $providerId)), + ); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -226,7 +251,11 @@ public function updateAccessDeclarative(string $appId, string $providerId, strin */ public function deleteProvider(string $appId, string $providerId): void { $this->collectAllContentProviders(); - $this->actionService->deleteProvider(ProviderConfigService::getConfigKey($appId, $providerId)); + try { + $this->actionService->deleteProvider(ProviderConfigService::getConfigKey($appId, $providerId)); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } /** @@ -242,8 +271,12 @@ public function deleteContent(string $appId, string $providerId, array $itemIds) $this->collectAllContentProviders(); $providerKey = ProviderConfigService::getConfigKey($appId, $providerId); - $this->actionService->deleteSources(array_map(function (string $itemId) use ($providerKey) { - return ProviderConfigService::getSourceId($itemId, $providerKey); - }, $itemIds)); + try { + $this->actionService->deleteSources(array_map(function (string $itemId) use ($providerKey) { + return ProviderConfigService::getSourceId($itemId, $providerKey); + }, $itemIds)); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + } } } diff --git a/lib/Service/ActionScheduler.php b/lib/Service/ActionScheduler.php index dc91ad03..bd7ad8a9 100644 --- a/lib/Service/ActionScheduler.php +++ b/lib/Service/ActionScheduler.php @@ -15,6 +15,7 @@ use OCA\ContextChat\Type\ActionType; use OCA\ContextChat\Type\UpdateAccessOp; use OCP\BackgroundJob\IJobList; +use OCP\DB\Exception; class ActionScheduler { public const BATCH_SIZE = 500; @@ -27,9 +28,10 @@ public function __construct( } /** - * @param ActionType::* $type + * @param string $type * @param string $payload * @return void + * @throws Exception */ private function scheduleAction(string $type, string $payload): void { $item = new QueueAction(); @@ -45,6 +47,7 @@ private function scheduleAction(string $type, string $payload): void { /** * @param string[] $sourceIds * @return void + * @throws Exception */ public function deleteSources(array $sourceIds): void { // batch sourceIds into self::BATCH_SIZE chunks @@ -63,6 +66,7 @@ public function deleteSources(array $sourceIds): void { /** * @param string $providerKey * @return void + * @throws Exception */ public function deleteProvider(string $providerKey): void { $payload = json_encode(['providerId' => $providerKey]); @@ -76,6 +80,7 @@ public function deleteProvider(string $providerKey): void { /** * @param string $userId * @return void + * @throws Exception */ public function deleteUser(string $userId): void { $payload = json_encode(['userId' => $userId]); @@ -91,6 +96,7 @@ public function deleteUser(string $userId): void { * @param string[] $userIds * @param string $sourceId * @return void + * @throws Exception */ public function updateAccess(string $op, array $userIds, string $sourceId): void { if (count($userIds) === 0) { @@ -110,6 +116,7 @@ public function updateAccess(string $op, array $userIds, string $sourceId): void * @param string[] $userIds * @param string $providerId * @return void + * @throws Exception */ public function updateAccessProvider(string $op, array $userIds, string $providerId): void { if (count($userIds) === 0) { @@ -128,6 +135,7 @@ public function updateAccessProvider(string $op, array $userIds, string $provide * @param string[] $userIds * @param string $sourceId * @return void + * @throws Exception */ public function updateAccessDeclSource(array $userIds, string $sourceId): void { if (count($userIds) === 0) { diff --git a/lib/Service/FsEventScheduler.php b/lib/Service/FsEventScheduler.php index 683240d1..71baa63a 100644 --- a/lib/Service/FsEventScheduler.php +++ b/lib/Service/FsEventScheduler.php @@ -27,13 +27,17 @@ public function __construct( } /** - * @throws Exception + * @throws NotFoundException */ private function getOwnerIdForNode(Node $node): string { if ($node->getOwner()) { return $node->getOwner()->getUID(); } - $ownerId = $this->storageService->getOwnerForFileId($node->getId()); + try { + $ownerId = $this->storageService->getOwnerForFileId($node->getId()); + } catch (InvalidPathException|NotFoundException $e) { + throw new NotFoundException('Cannot get owner for node ID ' . $node->getId(), previous: $e); + } if ($ownerId !== false) { return $ownerId; } diff --git a/lib/Service/FsEventService.php b/lib/Service/FsEventService.php index faae1703..3622858a 100644 --- a/lib/Service/FsEventService.php +++ b/lib/Service/FsEventService.php @@ -47,12 +47,21 @@ public function onAccessUpdateDecl(Node $node, bool $recurse = true): void { return; } } + foreach ($fileIds as $file) { + try { + $fileRef = ProviderConfigService::getSourceId($file->getId()); + $userIds = $this->storageService->getUsersForFileId($file->getId()); - foreach ($fileIds as $fileId) { - $fileRef = ProviderConfigService::getSourceId($fileId); - $fileUserIds = $this->storageService->getUsersForFileId($fileId); - - $this->actionService->updateAccessDeclSource($fileUserIds, $fileRef); + $this->actionService->updateAccessDeclSource($userIds, $fileRef); + } catch (InvalidPathException|NotFoundException $e) { + $this->logger->warning('Cannot get file id for declarative access update:' . $e->getMessage(), [ + 'exception' => $e + ]); + } catch (Exception $e) { + $this->logger->warning('Failed to insert declarative access update into DB:' . $e->getMessage(), [ + 'exception' => $e + ]); + } } } @@ -77,7 +86,7 @@ public function onDelete(Node $node, bool $recurse = true): void { foreach ($fileIds as $fileId) { try { $fileRefs[] = ProviderConfigService::getSourceId($fileId); - } catch (InvalidPathException|NotFoundException $e) { + } catch (InvalidPathException|NotFoundException|Exception $e) { $this->logger->warning($e->getMessage(), ['exception' => $e]); } } diff --git a/lib/Service/QueueService.php b/lib/Service/QueueService.php index 640336a2..9ca3b7c5 100644 --- a/lib/Service/QueueService.php +++ b/lib/Service/QueueService.php @@ -13,6 +13,7 @@ use OCA\ContextChat\Db\QueueFile; use OCA\ContextChat\Db\QueueMapper; use OCP\BackgroundJob\IJobList; +use OCP\DB\Exception; class QueueService { @@ -82,6 +83,9 @@ public function removeFromQueue(array $files): void { $this->queueMapper->removeFromQueue($files); } + /** + * @throws Exception + */ public function clearQueue(): void { $this->queueMapper->clearQueue(); } From dc3ef71508d2d3043cee3bc0413d92485e863ded Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 12:43:42 +0200 Subject: [PATCH 03/26] fix(FsEventMapper): Prevent inserting duplicates Signed-off-by: Marcel Klehr --- lib/Db/FsEventMapper.php | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/Db/FsEventMapper.php b/lib/Db/FsEventMapper.php index 97d51789..a50e724b 100644 --- a/lib/Db/FsEventMapper.php +++ b/lib/Db/FsEventMapper.php @@ -9,6 +9,7 @@ namespace OCA\ContextChat\Db; +use OCP\AppFramework\Db\Entity; use OCP\AppFramework\Db\QBMapper; use OCP\DB\Exception; use OCP\DB\QueryBuilder\IQueryBuilder; @@ -27,6 +28,24 @@ public function __construct(IDBConnection $db) { parent::__construct($db, 'context_chat_fs_events', FsEvent::class); } + #[\Override] + public function insert(Entity $entity): Entity { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->setMaxResults(1) + ->where( + $qb->expr()->eq('user_id', $qb->createNamedParameter($entity->getUserId())), + $qb->expr()->eq('node_id', $qb->createNamedParameter($entity->getNodeId(), IQueryBuilder::PARAM_INT)), + $qb->expr()->eq('type', $qb->createNamedParameter($entity->getType())) + ); + $entities = $this->findEntities($qb); + if (empty($entities)) { + return parent::insert($entity); + } + return $entities[0]; + } + /** * @param int $limit * @return array From 5f2ebbbefd2956e7e8193270550157cf14ac5ca9 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 12:44:22 +0200 Subject: [PATCH 04/26] fix(FileSystemListenerJob): Prevent OOM by working per user and tearing down FS in between Signed-off-by: Marcel Klehr --- lib/BackgroundJobs/FileSystemListenerJob.php | 70 ++++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/lib/BackgroundJobs/FileSystemListenerJob.php b/lib/BackgroundJobs/FileSystemListenerJob.php index eb7d8e47..79c63618 100644 --- a/lib/BackgroundJobs/FileSystemListenerJob.php +++ b/lib/BackgroundJobs/FileSystemListenerJob.php @@ -63,43 +63,55 @@ protected function run($argument): void { return; } + $eventsByUser = []; foreach ($fsEvents as $fsEvent) { - $this->diagnosticService->sendHeartbeat(static::class, $this->getId()); - - try { - $node = current($this->rootFolder->getUserFolder($fsEvent->getUserId())->getById($fsEvent->getNodeId())); - } catch (\Exception $e) { - $this->logger->warning('Error retrieving node for fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); - $node = false; + if (!isset($eventsByUser[$fsEvent->getUserId()])) { + $eventsByUser[$fsEvent->getUserId()] = []; } - if ($node === false) { - $this->logger->warning('Node with ID ' . $fsEvent->getNodeId() . ' not found for fs event "' . $fsEvent->getType() . '"'); + $eventsByUser[$fsEvent->getUserId()][] = $fsEvent; + } + + foreach ($eventsByUser as $userId => $events) { + foreach ($events as $fsEvent) { + $this->diagnosticService->sendHeartbeat(static::class, $this->getId()); + + try { + $node = current($this->rootFolder->getUserFolder($fsEvent->getUserId())->getById($fsEvent->getNodeId())); + } catch (\Exception $e) { + $this->logger->warning('Error retrieving node for fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); + $node = false; + } + if ($node === false) { + $this->logger->warning('Node with ID ' . $fsEvent->getNodeId() . ' not found for fs event "' . $fsEvent->getType() . '"'); + try { + $this->fsEventMapper->delete($fsEvent); + } catch (Exception $e) { + $this->logger->warning('Error deleting fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); + } + continue; + } + + try { + switch ($fsEvent->getTypeObject()) { + case FsEventType::CREATE: + $this->fsEventService->onInsert($node); + break; + case FsEventType::ACCESS_UPDATE_DECL: + $this->fsEventService->onAccessUpdateDecl($node); + break; + } + $this->diagnosticService->sendHeartbeat(static::class, $this->getId()); + } catch (\Throwable $e) { + $this->logger->warning('Error handling fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); + } try { $this->fsEventMapper->delete($fsEvent); } catch (Exception $e) { $this->logger->warning('Error deleting fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); } - continue; - } - - try { - switch ($fsEvent->getTypeObject()) { - case FsEventType::CREATE: - $this->fsEventService->onInsert($node); - break; - case FsEventType::ACCESS_UPDATE_DECL: - $this->fsEventService->onAccessUpdateDecl($node); - break; - } - $this->diagnosticService->sendHeartbeat(static::class, $this->getId()); - } catch (\Throwable $e) { - $this->logger->warning('Error handling fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); - } - try { - $this->fsEventMapper->delete($fsEvent); - } catch (Exception $e) { - $this->logger->warning('Error deleting fs event "' . $fsEvent->getType() . '": ' . $e->getMessage(), ['exception' => $e]); } + // Tear down to avoid memory leaks + \OC_Util::tearDownFS(); } } finally { $this->diagnosticService->sendJobEnd(static::class, $this->getId()); From ba1d05c9a09884bdb9a6e16894a930d333c72aaa Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 12:59:54 +0200 Subject: [PATCH 05/26] fix: Fix psalm issues Signed-off-by: Marcel Klehr --- lib/Service/FsEventService.php | 6 +++--- tests/psalm-baseline.xml | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/Service/FsEventService.php b/lib/Service/FsEventService.php index 3622858a..d675b22b 100644 --- a/lib/Service/FsEventService.php +++ b/lib/Service/FsEventService.php @@ -47,10 +47,10 @@ public function onAccessUpdateDecl(Node $node, bool $recurse = true): void { return; } } - foreach ($fileIds as $file) { + foreach ($fileIds as $fileId) { try { - $fileRef = ProviderConfigService::getSourceId($file->getId()); - $userIds = $this->storageService->getUsersForFileId($file->getId()); + $fileRef = ProviderConfigService::getSourceId($fileId); + $userIds = $this->storageService->getUsersForFileId($fileId); $this->actionService->updateAccessDeclSource($userIds, $fileRef); } catch (InvalidPathException|NotFoundException $e) { diff --git a/tests/psalm-baseline.xml b/tests/psalm-baseline.xml index 752743e1..ecc6962e 100644 --- a/tests/psalm-baseline.xml +++ b/tests/psalm-baseline.xml @@ -3,7 +3,12 @@ - SPDX-FileCopyrightText: 2023 Nextcloud GmbH and Nextcloud contributors - SPDX-License-Identifier: AGPL-3.0-or-later --> - + + + + + + From cf20fa78e115ab2aa05a8aa396d09b7c8ea7e1f3 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 14:42:32 +0200 Subject: [PATCH 06/26] fix(FileSystemListenerJob) Use SetupManager to tear down fs Signed-off-by: Marcel Klehr --- lib/BackgroundJobs/FileSystemListenerJob.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/BackgroundJobs/FileSystemListenerJob.php b/lib/BackgroundJobs/FileSystemListenerJob.php index 79c63618..1340a26f 100644 --- a/lib/BackgroundJobs/FileSystemListenerJob.php +++ b/lib/BackgroundJobs/FileSystemListenerJob.php @@ -9,6 +9,7 @@ namespace OCA\ContextChat\BackgroundJobs; +use OC\Files\SetupManager; use OCA\ContextChat\Db\FsEventMapper; use OCA\ContextChat\Logger; use OCA\ContextChat\Service\DiagnosticService; @@ -111,7 +112,8 @@ protected function run($argument): void { } } // Tear down to avoid memory leaks - \OC_Util::tearDownFS(); + $setupManager = \OCP\Server::get(SetupManager::class); + $setupManager->tearDown(); } } finally { $this->diagnosticService->sendJobEnd(static::class, $this->getId()); From be6b2fac654777b4ba3c21a345e271962711016b Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 14:50:49 +0200 Subject: [PATCH 07/26] feat: Add stats entry for queued unseen files Signed-off-by: Marcel Klehr --- lib/Command/Statistics.php | 3 +++ lib/Db/QueueMapper.php | 11 +++++++---- lib/Service/QueueService.php | 7 +++++++ lib/Settings/AdminSettings.php | 6 ++++++ src/components/ViewAdmin.vue | 2 +- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/Command/Statistics.php b/lib/Command/Statistics.php index ffd07e7c..31a2c7c4 100644 --- a/lib/Command/Statistics.php +++ b/lib/Command/Statistics.php @@ -60,6 +60,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int $queueCount = $this->queueService->count(); $output->writeln('Files in indexing queue: ' . $queueCount); + $queueNewCount = $this->queueService->countNewFiles(); + $output->writeln('New files in indexing queue (without updates): ' . $queueNewCount); + $queuedDocumentsCount = $this->contentQueue->count(); $output->writeln('Queued documents (without files):' . var_export($queuedDocumentsCount, true)); diff --git a/lib/Db/QueueMapper.php b/lib/Db/QueueMapper.php index 4f0a8382..377af5eb 100644 --- a/lib/Db/QueueMapper.php +++ b/lib/Db/QueueMapper.php @@ -124,11 +124,14 @@ public function clearQueue(): void { /** * @throws \OCP\DB\Exception */ - public function count() : int { + public function count(bool $onlyNewFiles = false) : int { $qb = $this->db->getQueryBuilder(); - $result = $qb->select($qb->func()->count('id')) - ->from($this->getTableName()) - ->executeQuery(); + $qb->select($qb->func()->count('id')) + ->from($this->getTableName()); + if ($onlyNewFiles) { + $qb->andWhere($qb->expr()->eq('update', $qb->createPositionalParameter(false, IQueryBuilder::PARAM_BOOL))); + } + $result = $qb->executeQuery(); if (($cnt = $result->fetchOne()) !== false) { return (int)$cnt; } diff --git a/lib/Service/QueueService.php b/lib/Service/QueueService.php index 9ca3b7c5..b8abf819 100644 --- a/lib/Service/QueueService.php +++ b/lib/Service/QueueService.php @@ -96,4 +96,11 @@ public function clearQueue(): void { public function count(): int { return $this->queueMapper->count(); } + + /** + * @throws \OCP\DB\Exception + */ + public function countNewFiles(): int { + return $this->queueMapper->count(true); + } } diff --git a/lib/Settings/AdminSettings.php b/lib/Settings/AdminSettings.php index d7c8401b..506260ed 100644 --- a/lib/Settings/AdminSettings.php +++ b/lib/Settings/AdminSettings.php @@ -81,6 +81,12 @@ public function getForm(): TemplateResponse { $this->logger->error($e->getMessage(), ['exception' => $e]); $queued_files_count = 0; } + try { + $stats['queued_new_files_count'] = $this->queueService->countNewFiles(); + } catch (Exception $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); + $stats['queued_new_files_count'] = 0; + } try { $stats['queued_documents_counts'] = $this->contentQueue->count(); $stats['queued_documents_counts'][ProviderConfigService::getDefaultProviderKey()] = $queued_files_count; diff --git a/src/components/ViewAdmin.vue b/src/components/ViewAdmin.vue index 2d8e4353..91223258 100644 --- a/src/components/ViewAdmin.vue +++ b/src/components/ViewAdmin.vue @@ -45,7 +45,7 @@ SPDX-License-Identifier: AGPL-3.0-or-later {{ stats.vectordb_document_counts[providerId] }} From 3469a42b86a07da04a97c3bdb47a99792a53472c Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 7 Aug 2025 14:58:01 +0200 Subject: [PATCH 08/26] fix: Add RemoveDuplicateFsEvents repair step Signed-off-by: Marcel Klehr --- appinfo/info.xml | 3 ++ lib/Migration/RemoveDuplicateFsEvents.php | 56 +++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 lib/Migration/RemoveDuplicateFsEvents.php diff --git a/appinfo/info.xml b/appinfo/info.xml index 71d83bef..50b17573 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -58,6 +58,9 @@ Refer to the [Context Chat Backend's readme](https://github.com/nextcloud/contex OCA\ContextChat\Repair\AppInstallStep + + OCA\ContextChat\Migration\RemoveDuplicateFsEvents + diff --git a/lib/Migration/RemoveDuplicateFsEvents.php b/lib/Migration/RemoveDuplicateFsEvents.php new file mode 100644 index 00000000..efc59e45 --- /dev/null +++ b/lib/Migration/RemoveDuplicateFsEvents.php @@ -0,0 +1,56 @@ +db->getQueryBuilder(); + $subQuery->selectAlias($subQuery->func()->min('id'), 'id') + ->from('context_chat_fs_events') + ->groupBy('type', 'user_id', 'node_id'); + + if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_MYSQL) { + $secondSubQuery = $this->db->getQueryBuilder(); + $secondSubQuery->select('id')->from($secondSubQuery->createFunction('(' . $subQuery->getSQL() . ')'), 'x'); + $sql = $secondSubQuery->getSQL(); + } else { + $sql = $subQuery->getSQL(); + } + + $qb = $this->db->getQueryBuilder(); + $qb->delete('context_chat_fs_events') + ->where($qb->expr()->notIn('id', $qb->createFunction('(' . $sql . ')'))); + + $qb->executeStatement(); + } catch (\Throwable $e) { + $output->warning('Failed to automatically remove duplicate fs events for context_chat.'); + $this->logger->error('Failed to automatically remove duplicate fs events for context_chat', ['exception' => $e]); + } + } +} From dd953032937671a9c48472701c8c1c79081571bd Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Fri, 8 Aug 2025 08:16:06 +0200 Subject: [PATCH 09/26] fix: psalm issues Signed-off-by: Marcel Klehr --- tests/psalm-baseline.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/psalm-baseline.xml b/tests/psalm-baseline.xml index ecc6962e..a5887909 100644 --- a/tests/psalm-baseline.xml +++ b/tests/psalm-baseline.xml @@ -1,13 +1,13 @@ - - + + + + + From 7313a304df78e0f211f5e9729e5d675f6c48e043 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Fri, 8 Aug 2025 10:56:54 +0200 Subject: [PATCH 10/26] tests: Kill workers manually after each test case Signed-off-by: Marcel Klehr --- .github/workflows/integration-test.yml | 36 ++++++++++++++++++++------ 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 41f8993d..896e3630 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -264,8 +264,10 @@ jobs: - name: Run the prompts run: | - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER1_PID=$! + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER2_PID=$! OUT1=$(./occ context_chat:prompt admin "Which factors are taken into account for the Ethical AI Rating?") echo "$OUT1" echo '--------------------------------------------------' @@ -278,6 +280,9 @@ jobs: echo "$OUT1" | grep -q "If all of these points are met, we give a Green label." || exit 1 echo "$OUT2" | grep -q "If all of these points are met, we give a Green label." || exit 1 echo "$OUT3" | grep -q "overview.rst" || exit 1 + + kill -9 $WORKER1_PID + kill -9 $WORKER2_PID - name: Check python memory usage run: | @@ -591,8 +596,10 @@ jobs: - name: Run the prompts run: | - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER1_PID=$! + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER2_PID=$! OUT1=$(./occ context_chat:prompt admin "Which factors are taken into account for the Ethical AI Rating?") echo "$OUT1" echo '--------------------------------------------------' @@ -605,6 +612,9 @@ jobs: echo "$OUT1" | grep -q "If all of these points are met, we give a Green label." || exit 1 echo "$OUT2" | grep -q "If all of these points are met, we give a Green label." || exit 1 echo "$OUT3" | grep -q "overview.rst" || exit 1 + + kill -9 $WORKER1_PID + kill -9 $WORKER2_PID - name: Show nextcloud logs if: always() @@ -908,8 +918,10 @@ jobs: - name: Run the prompts run: | - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER1_PID=$! + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER2_PID=$! set +e # Check for user admin: Should be there @@ -951,6 +963,9 @@ jobs: echo "$OUT_TEST_SEARCH_AI" | grep -q -v "overview.rst" && echo '✅Test user does not see AI overview, when using search' || exit 1 echo "$OUT_TEST_PROMPT_JOBS" | grep -q -v "background jobs" && echo '✅Test user does not see BackgroundJobs docs' || exit 1 echo "$OUT_TEST_SEARCH_JOBS" | grep -q -v "backgroundjobs.md" && echo '✅Test user does not see BackgroundJobs docs' || exit 1 + + kill -9 $WORKER1_PID + kill -9 $WORKER2_PID - name: Remove some files using occ if: ${{ matrix.file-deletion-method == 'occ'}} @@ -983,8 +998,10 @@ jobs: - name: Run the prompts again run: | - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & - ./occ background-job:worker --stop_after=2m 'OC\TaskProcessing\SynchronousBackgroundJob' & + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER1_PID=$! + ./occ background-job:worker 'OC\TaskProcessing\SynchronousBackgroundJob' & + WORKER2_PID=$! set +e @@ -1027,6 +1044,9 @@ jobs: echo "$OUT_TEST_SEARCH_AI" | grep -q -v "overview.rst" && echo '✅Test user does not see AI overview, when using search' || exit 1 echo "$OUT_TEST_PROMPT_JOBS" | grep -q "background jobs" && echo '✅Test user does see BackgroundJobs docs' || exit 1 echo "$OUT_TEST_SEARCH_JOBS" | grep -q "backgroundjobs.md" && echo '✅Test user does see BackgroundJobs docs' || exit 1 + + kill -9 $WORKER1_PID + kill -9 $WORKER2_PID - name: Show nextcloud logs if: always() From 1c7b87e41019844025d89ecfb3f3acc73b18e2f4 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Fri, 8 Aug 2025 14:07:10 +0200 Subject: [PATCH 11/26] fix: Add comments to RemoveDuplicateFsEvents Signed-off-by: Marcel Klehr --- lib/Migration/RemoveDuplicateFsEvents.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/Migration/RemoveDuplicateFsEvents.php b/lib/Migration/RemoveDuplicateFsEvents.php index efc59e45..a240d86d 100644 --- a/lib/Migration/RemoveDuplicateFsEvents.php +++ b/lib/Migration/RemoveDuplicateFsEvents.php @@ -30,12 +30,16 @@ public function getName(): string { #[\Override] public function run(IOutput $output): void { try { + // Get the lowest ID for each combination of type, user_id, and node_id $subQuery = $this->db->getQueryBuilder(); $subQuery->selectAlias($subQuery->func()->min('id'), 'id') ->from('context_chat_fs_events') ->groupBy('type', 'user_id', 'node_id'); if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_MYSQL) { + // MySQL does not allow the table you're deleting from be used in a subquery for the condition. + // (We can't use the same table (e) in a DELETE and in its sub-SELECT. We can, however use a sub-sub-SELECT to create a temporary table (x), and use that for the sub-SELECT.) + // See https://stackoverflow.com/questions/4471277/mysql-delete-from-with-subquery-as-condition $secondSubQuery = $this->db->getQueryBuilder(); $secondSubQuery->select('id')->from($secondSubQuery->createFunction('(' . $subQuery->getSQL() . ')'), 'x'); $sql = $secondSubQuery->getSQL(); @@ -43,6 +47,7 @@ public function run(IOutput $output): void { $sql = $subQuery->getSQL(); } + // Delete all rows where the ID is not in the subquery result $qb = $this->db->getQueryBuilder(); $qb->delete('context_chat_fs_events') ->where($qb->expr()->notIn('id', $qb->createFunction('(' . $sql . ')'))); From ecb847ca59949db19e6952547374845e5d6d50bf Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Fri, 8 Aug 2025 14:39:40 +0200 Subject: [PATCH 12/26] fix(IndexerJob): Make sure too large files are removed from queue Signed-off-by: Marcel Klehr --- lib/BackgroundJobs/IndexerJob.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/BackgroundJobs/IndexerJob.php b/lib/BackgroundJobs/IndexerJob.php index f93dd226..7137b82f 100644 --- a/lib/BackgroundJobs/IndexerJob.php +++ b/lib/BackgroundJobs/IndexerJob.php @@ -205,6 +205,11 @@ protected function index(array $files): void { 'storageId' => $this->storageId, 'rootId' => $this->rootId ]); + try { + $this->queue->removeFromQueue([$queueFile]); + } catch (Exception $e) { + $this->logger->warning('[IndexerJob] Could not remove file from queue', ['exception' => $e, 'storageId' => $this->storageId, 'rootId' => $this->rootId]); + } continue; } From e4af538a2d2947c95b11719e98085ed98152a383 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Fri, 8 Aug 2025 14:41:35 +0200 Subject: [PATCH 13/26] fix(REUSE): Add copyright to psalm-baseline.xml Signed-off-by: Marcel Klehr --- tests/psalm-baseline.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/psalm-baseline.xml b/tests/psalm-baseline.xml index a5887909..5e64b85d 100644 --- a/tests/psalm-baseline.xml +++ b/tests/psalm-baseline.xml @@ -1,4 +1,8 @@ + From c4abdff26d7da61c62c36c98afeb37d903de5cf0 Mon Sep 17 00:00:00 2001 From: Anupam Kumar Date: Wed, 13 Aug 2025 12:22:02 +0530 Subject: [PATCH 14/26] fix: consider only new files for first indexing complete check Signed-off-by: Anupam Kumar --- lib/BackgroundJobs/IndexerJob.php | 7 ++++--- lib/BackgroundJobs/StorageCrawlJob.php | 2 +- lib/Service/ActionScheduler.php | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/BackgroundJobs/IndexerJob.php b/lib/BackgroundJobs/IndexerJob.php index 7137b82f..e499de73 100644 --- a/lib/BackgroundJobs/IndexerJob.php +++ b/lib/BackgroundJobs/IndexerJob.php @@ -297,15 +297,16 @@ private function setInitialIndexCompletion(): void { return; } try { - $queuedFilesCount = $this->queue->count(); + $queuedNewFilesCount = $this->queue->countNewFiles(); } catch (Exception $e) { $this->logger->warning('Could not count indexed files', ['exception' => $e]); return; } $crawlJobCount = $this->getJobCount(StorageCrawlJob::class); - // if any storage crawler jobs are still running or there are still files in the queue, we are still crawling - if ($crawlJobCount > 0 || $queuedFilesCount > 0) { + // if any storage crawler jobs are still running or there are still new files in the queue, + // we are still indexing files that were never indexed before. + if ($crawlJobCount > 0 || $queuedNewFilesCount > 0) { return; } diff --git a/lib/BackgroundJobs/StorageCrawlJob.php b/lib/BackgroundJobs/StorageCrawlJob.php index a10577a9..2889048a 100644 --- a/lib/BackgroundJobs/StorageCrawlJob.php +++ b/lib/BackgroundJobs/StorageCrawlJob.php @@ -65,6 +65,7 @@ protected function run($argument): void { $this->diagnosticService->sendHeartbeat(static::class, $this->getId()); try { $this->queue->insertIntoQueue($queueFile); + $i++; } catch (Exception $e) { $this->logger->error('[StorageCrawlJob] Failed to add file to queue', [ 'fileId' => $fileId, @@ -75,7 +76,6 @@ protected function run($argument): void { 'last_file_id' => $lastFileId ]); } - $i++; } if ($i > 0) { diff --git a/lib/Service/ActionScheduler.php b/lib/Service/ActionScheduler.php index bd7ad8a9..7d653496 100644 --- a/lib/Service/ActionScheduler.php +++ b/lib/Service/ActionScheduler.php @@ -28,7 +28,7 @@ public function __construct( } /** - * @param string $type + * @param string $type ActionType::* The type of action to schedule * @param string $payload * @return void * @throws Exception From e69479b8a858e38e0b6a54aef46f98b0b0d9c590 Mon Sep 17 00:00:00 2001 From: Anupam Kumar Date: Wed, 13 Aug 2025 12:22:33 +0530 Subject: [PATCH 15/26] wrap remove duplicate fs events db query in a transaction Signed-off-by: Anupam Kumar --- lib/Migration/RemoveDuplicateFsEvents.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/Migration/RemoveDuplicateFsEvents.php b/lib/Migration/RemoveDuplicateFsEvents.php index a240d86d..1e0baa90 100644 --- a/lib/Migration/RemoveDuplicateFsEvents.php +++ b/lib/Migration/RemoveDuplicateFsEvents.php @@ -30,6 +30,7 @@ public function getName(): string { #[\Override] public function run(IOutput $output): void { try { + $this->db->beginTransaction(); // Get the lowest ID for each combination of type, user_id, and node_id $subQuery = $this->db->getQueryBuilder(); $subQuery->selectAlias($subQuery->func()->min('id'), 'id') @@ -53,7 +54,9 @@ public function run(IOutput $output): void { ->where($qb->expr()->notIn('id', $qb->createFunction('(' . $sql . ')'))); $qb->executeStatement(); + $this->db->commit(); } catch (\Throwable $e) { + $this->db->rollBack(); $output->warning('Failed to automatically remove duplicate fs events for context_chat.'); $this->logger->error('Failed to automatically remove duplicate fs events for context_chat', ['exception' => $e]); } From 8da95af31673fbaae21b965e0649a708ee8383d3 Mon Sep 17 00:00:00 2001 From: Anupam Kumar Date: Tue, 19 Aug 2025 13:20:11 +0530 Subject: [PATCH 16/26] fix: adjustments to admin stats page Signed-off-by: Anupam Kumar --- lib/Service/LangRopeService.php | 5 +- lib/Settings/AdminSettings.php | 3 +- src/components/ViewAdmin.vue | 150 +++++++++++++++++--------------- 3 files changed, 83 insertions(+), 75 deletions(-) diff --git a/lib/Service/LangRopeService.php b/lib/Service/LangRopeService.php index 31851d5f..2df65d75 100644 --- a/lib/Service/LangRopeService.php +++ b/lib/Service/LangRopeService.php @@ -183,8 +183,9 @@ private function requestToExApp( */ public function getIndexedDocumentsCounts(): array { $response = $this->requestToExApp('/countIndexedDocuments', 'POST'); - if (!isset($response['files__default'])) { - return []; + if (!isset($response[ProviderConfigService::getDefaultProviderKey()])) { + throw new \RuntimeException("Malformed indexed documents count response from Context Chat Backend (ExApp): '" + . ProviderConfigService::getDefaultProviderKey() . "' key is missing"); } return $response; } diff --git a/lib/Settings/AdminSettings.php b/lib/Settings/AdminSettings.php index 506260ed..26f9d3c6 100644 --- a/lib/Settings/AdminSettings.php +++ b/lib/Settings/AdminSettings.php @@ -55,7 +55,7 @@ public function getForm(): TemplateResponse { $this->logger->error($e->getMessage(), ['exception' => $e]); $stats['eligible_files_count'] = 0; } - $stats['indexed_files_count'] = Util::numericToNumber($this->appConfig->getAppValueString('indexed_files_count', '0')); + $stats['recorded_indexed_files_count'] = Util::numericToNumber($this->appConfig->getAppValueString('indexed_files_count', '0')); try { $stats['queued_actions_count'] = $this->actionService->count(); } catch (Exception $e) { @@ -72,6 +72,7 @@ public function getForm(): TemplateResponse { $stats['vectordb_document_counts'] = $this->langRopeService->getIndexedDocumentsCounts(); $stats['backend_available'] = true; } catch (\RuntimeException $e) { + $this->logger->error($e->getMessage(), ['exception' => $e]); $stats['backend_available'] = false; $stats['vectordb_document_counts'] = [ ProviderConfigService::getDefaultProviderKey() => 0 ]; } diff --git a/src/components/ViewAdmin.vue b/src/components/ViewAdmin.vue index 91223258..6d4b6b01 100644 --- a/src/components/ViewAdmin.vue +++ b/src/components/ViewAdmin.vue @@ -4,83 +4,89 @@ SPDX-License-Identifier: AGPL-3.0-or-later --> -