From 2f11efc33902cd859da9dfde76506a5f91ebd221 Mon Sep 17 00:00:00 2001 From: Machac Date: Thu, 23 Oct 2025 10:20:01 +0200 Subject: [PATCH 01/37] Release/7.0.0-RC9 - Updated parent POM version from `7.0.0-RC8.1` to `7.0.0-RC9` across all module POM files. --- application-engine/pom.xml | 2 +- nae-object-library/pom.xml | 2 +- nae-spring-core-adapter/pom.xml | 2 +- nae-user-ce/pom.xml | 2 +- nae-user-common/pom.xml | 2 +- pom.xml | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/application-engine/pom.xml b/application-engine/pom.xml index da5d0cd77c6..2bd5e7c5a55 100644 --- a/application-engine/pom.xml +++ b/application-engine/pom.xml @@ -6,7 +6,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 application-engine diff --git a/nae-object-library/pom.xml b/nae-object-library/pom.xml index 24bf6a02470..774d8390ec9 100644 --- a/nae-object-library/pom.xml +++ b/nae-object-library/pom.xml @@ -7,7 +7,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 nae-object-library diff --git a/nae-spring-core-adapter/pom.xml b/nae-spring-core-adapter/pom.xml index 28d2c15eaab..baa42182723 100644 --- a/nae-spring-core-adapter/pom.xml +++ b/nae-spring-core-adapter/pom.xml @@ -7,7 +7,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 nae-spring-core-adapter diff --git a/nae-user-ce/pom.xml b/nae-user-ce/pom.xml index 9d2ea018c4b..83241c8c9dc 100644 --- a/nae-user-ce/pom.xml +++ b/nae-user-ce/pom.xml @@ -6,7 +6,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 nae-user-ce diff --git a/nae-user-common/pom.xml b/nae-user-common/pom.xml index c19da92f83e..c4e756a3014 100644 --- a/nae-user-common/pom.xml +++ b/nae-user-common/pom.xml @@ -6,7 +6,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 nae-user-common diff --git a/pom.xml b/pom.xml index 8d9e904ec88..8e1c15deb81 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.netgrif application-engine-parent - 7.0.0-RC8.1 + 7.0.0-RC9 pom NETGRIF Application Engine parent From b40fe8640c3aa697b1e21912c5948a698ee27349 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 09:49:03 +0100 Subject: [PATCH 02/37] [NAE-2250] Optimize Elasticsearch reindexing and bulk operations for performance and reliability Replaced direct Elasticsearch operations with ElasticQueueManager for better batch processing and asynchronous handling of indexing and deletion tasks. Updated configuration to include queue properties and standardized dependency handling with constructor injection. This improves performance, scalability, and maintainability of Elasticsearch operations. --- .../ElasticServiceConfiguration.java | 49 +++++-- .../DataConfigurationProperties.java | 47 +++++++ .../elastic/service/ElasticCaseService.java | 93 ++++++-------- .../elastic/service/ElasticQueueManager.java | 120 ++++++++++++++++++ .../spring/elastic/domain/ElasticCase.java | 2 +- .../elastic/domain/ElasticPetriNet.java | 2 +- .../spring/elastic/domain/ElasticTask.java | 2 +- 7 files changed, 246 insertions(+), 69 deletions(-) create mode 100644 application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java index bcc457e059d..07c23d8d06c 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java @@ -6,16 +6,22 @@ import com.netgrif.application.engine.elastic.service.ElasticCaseService; import com.netgrif.application.engine.elastic.service.ElasticTaskService; import com.netgrif.application.engine.elastic.service.executors.Executor; +import com.netgrif.application.engine.elastic.service.interfaces.IElasticCasePrioritySearch; import com.netgrif.application.engine.elastic.service.interfaces.IElasticCaseService; import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskService; -import org.springframework.beans.factory.annotation.Autowired; +import com.netgrif.application.engine.petrinet.service.interfaces.IPetriNetService; +import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; +import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration +@RequiredArgsConstructor @ConditionalOnProperty( value = "netgrif.engine.data.elasticsearch.service.configuration-enabled", matchIfMissing = true, @@ -23,17 +29,14 @@ ) public class ElasticServiceConfiguration { - @Autowired - private ElasticCaseRepository caseRepository; - - @Autowired - private ElasticTaskRepository taskRepository; - - @Autowired - private ElasticsearchTemplate elasticsearchTemplate; - - @Autowired - private DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties; + private final ElasticCaseRepository caseRepository; + private final ElasticTaskRepository taskRepository; + private final ElasticsearchTemplate elasticsearchTemplate; + private final DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties; + private final IPetriNetService petriNetService; + private final IWorkflowService workflowService; + private final IElasticCasePrioritySearch elasticCasePrioritySearch; + private final ApplicationEventPublisher applicationEventPublisher; @Bean @Primary @@ -54,7 +57,16 @@ public Executor reindexingTaskTaskExecutor() { @Bean @Primary public IElasticCaseService elasticCaseService() { - return new ElasticCaseService(caseRepository, elasticsearchTemplate, executor()); + return new ElasticCaseService( + caseRepository, + elasticsearchTemplate, + executor(), + elasticsearchProperties, + petriNetService, + workflowService, + elasticCasePrioritySearch, + applicationEventPublisher + ); } @Bean @@ -65,7 +77,16 @@ public IElasticTaskService elasticTaskService() { @Bean public IElasticCaseService reindexingTaskElasticCaseService() { - return new ElasticCaseService(caseRepository, elasticsearchTemplate, reindexingTaskCaseExecutor()); + return new ElasticCaseService( + caseRepository, + elasticsearchTemplate, + reindexingTaskCaseExecutor(), + elasticsearchProperties, + petriNetService, + workflowService, + elasticCasePrioritySearch, + applicationEventPublisher + ); } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 996318b03c6..0cfbb11618f 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -521,6 +521,15 @@ public static class ElasticsearchProperties { */ @Valid private BatchProperties batch = new BatchProperties(); + + + /** + * Configuration properties for handling queues in Elasticsearch operations. + * These properties specify the behavior of the ElasticQueueManager, + * including the maximum queue size, delay between flush operations, + * and the thread pool size for scheduled executor service tasks. + */ + private QueueProperties queue = new QueueProperties(); public static final String PETRI_NET_INDEX = "petriNet"; @@ -638,6 +647,44 @@ public static class BatchProperties { @Min(1) private int taskBatchSize = 20000; } + + + /** + * Configuration properties for handling queues in Elasticsearch operations. + * These properties specify the behavior of the ElasticQueueManager, + * including the maximum queue size, delay between flush operations, + * and the thread pool size for scheduled executor service tasks. + */ + @Data + public static class QueueProperties { + + /** + * The size of the thread pool for the scheduled executor service. + * This determines the number of threads available to schedule and execute tasks. + * Default value: 50. + */ + private int scheduledExecutorPoolSize = 50; + + /** + * Delay time between flush operations in the queue. + * This value represents the amount of time the scheduler waits before initiating the next flush. + * Default value: 150. + */ + private int delay = 150; + + /** + * The time unit of the delay for flush operations. + * Default value: {@link TimeUnit#MILLISECONDS}. + */ + private TimeUnit delayUnit = TimeUnit.MILLISECONDS; + + /** + * Maximum number of elements allowed in the queue. + * When the queue size reaches this limit, it triggers a flush operation. + * Default value: 400. + */ + private int maxQueueSize = 400; + } } /** diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index 07d69011897..d14670949ac 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -22,10 +22,8 @@ import com.netgrif.application.engine.utils.FullPageRequest; import com.netgrif.application.engine.objects.workflow.domain.Case; import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; -import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.dao.InvalidDataAccessApiUsageException; @@ -36,7 +34,7 @@ import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.Order; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.stereotype.Service; import java.util.*; @@ -48,7 +46,6 @@ import static org.springframework.data.elasticsearch.client.elc.Queries.termQuery; @Service -@RequiredArgsConstructor public class ElasticCaseService extends ElasticViewPermissionService implements IElasticCaseService { private static final Logger log = LoggerFactory.getLogger(ElasticCaseService.class); @@ -60,69 +57,61 @@ public class ElasticCaseService extends ElasticViewPermissionService implements protected IPetriNetService petriNetService; protected IWorkflowService workflowService; protected IElasticCasePrioritySearch iElasticCasePrioritySearch; - - @Autowired - @Lazy - public void setWorkflowService(IWorkflowService workflowService) { - this.workflowService = workflowService; - } - - @Autowired - @Lazy - public void setPetriNetService(IPetriNetService petriNetService) { - this.petriNetService = petriNetService; - } - - @Autowired protected ApplicationEventPublisher publisher; - - @Autowired - public void setElasticCasePrioritySearch(IElasticCasePrioritySearch iElasticCasePrioritySearch) { - this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; - } - - @Autowired - public void setElasticProperties(DataConfigurationProperties.ElasticsearchProperties elasticProperties) { + protected ElasticQueueManager caseElasticIndexQueueManager; + protected ElasticQueueManager caseElasticDeleteQueueManager; + + public ElasticCaseService(ElasticCaseRepository repository, + ElasticsearchTemplate template, + Executor executors, + DataConfigurationProperties.ElasticsearchProperties elasticProperties, + @Lazy IPetriNetService petriNetService, + @Lazy IWorkflowService workflowService, + IElasticCasePrioritySearch iElasticCasePrioritySearch, + ApplicationEventPublisher publisher) { + this.repository = repository; + this.template = template; + this.executors = executors; this.elasticProperties = elasticProperties; + this.petriNetService = petriNetService; + this.workflowService = workflowService; + this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; + this.publisher = publisher; + this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)); + this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)); } @Override public void remove(String caseId) { - executors.execute(caseId, () -> { - repository.deleteAllById(caseId); - log.info("[" + caseId + "]: Case \"" + caseId + "\" deleted"); - }); + caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("id").is(caseId)).build()).build()); + log.info("[{}]: Case \"{}\" deleted", caseId, caseId); } @Override public void removeByPetriNetId(String processId) { - executors.execute(processId, () -> { - repository.deleteAllByProcessId(processId); - log.info("[" + processId + "]: All cases of Petri Net with id \"" + processId + "\" deleted"); - }); + caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("processId").is(processId)).build()).build()); + log.info("[{}]: All cases of Petri Net with id \"{}\" deleted", processId, processId); } @Override public void index(ElasticCase useCase) { - executors.execute(useCase.getId(), () -> { - try { - Optional elasticCaseOptional = repository.findById(useCase.getId()); - if (elasticCaseOptional.isEmpty()) { - repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); - } else { - com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); - elasticCase.update(useCase); - repository.save(elasticCase); - } - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); - publisher.publishEvent(new IndexCaseEvent(useCase)); - } catch (InvalidDataAccessApiUsageException ignored) { - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed"); - repository.deleteAllById(useCase.getId()); - repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); + try { + Optional elasticCaseOptional = repository.findById(useCase.getId()); + if (elasticCaseOptional.isEmpty()) { + caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(useCase).build()); + } else { + com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); + elasticCase.update(useCase); + caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(elasticCase).build()); } - }); + log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); + publisher.publishEvent(new IndexCaseEvent(useCase)); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed"); + repository.deleteAllById(useCase.getId()); + repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); + log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); + } } @Override diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java new file mode 100644 index 00000000000..5f0e35f597a --- /dev/null +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -0,0 +1,120 @@ +package com.netgrif.application.engine.elastic.service; + +import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; +import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.RefreshPolicy; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BulkOptions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * Manages a queue of elastic queries and handles their periodic bulk processing in Elasticsearch. + * This class efficiently buffers and schedules elastic queries in batches, ensuring timely + * indexing into the Elasticsearch index with controlled execution and concurrency. + * + * @param the type of elements to be managed in the queue and indexed by Elasticsearch + */ +public final class ElasticQueueManager { + + private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); + + private final String indexName; + + private final Queue queue; + + private final ScheduledExecutorService scheduler; + + private final AtomicReference> atomicDelayer; + + private final DataConfigurationProperties.ElasticsearchProperties.QueueProperties queueProperties; + + private final ElasticsearchTemplate elasticsearchTemplate; + + /** + * Constructs an ElasticQueueManager instance. + * + * @param elasticsearchProperties the configuration properties for Elasticsearch, including queue parameters + * @param elasticsearchTemplate the template for interacting with Elasticsearch + * @param indexName the name of the Elasticsearch index + */ + public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, + ElasticsearchTemplate elasticsearchTemplate, + String indexName) { + queue = new ConcurrentLinkedDeque<>(); + atomicDelayer = new AtomicReference<>(); + queueProperties = elasticsearchProperties.getQueue(); + scheduler = Executors.newScheduledThreadPool(queueProperties.getScheduledExecutorPoolSize()); + this.elasticsearchTemplate = elasticsearchTemplate; + this.indexName = indexName; + } + + /** + * Shuts down the scheduler and flushes any remaining elements in the queue + * to Elasticsearch before the application stops. + */ + @PreDestroy + public void shutdown() { + scheduler.shutdown(); + flush(); + } + + + /** + * Adds an elastic query to the queue and resets the timer for the scheduled flush. + * + * @param elasticQuery the elastic query to be added to the queue + */ + public void push(E elasticQuery) { + queue.add(elasticQuery); + resetTimer(); + } + + /** + * Synchronously processes and flushes a batch of elements from the queue + * to Elasticsearch. Ensures that no more than the maximum queue size is + * processed at a time. + */ + private synchronized void flush() { + List batch = new ArrayList<>(); + + while (!queue.isEmpty() && batch.size() < queueProperties.getMaxQueueSize()) { + batch.add(queue.poll()); + } + + String uuid = UUID.randomUUID().toString(); + if (!batch.isEmpty()) { + log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); + BulkOptions bulkOptions = BulkOptions.builder() + .withRefreshPolicy(RefreshPolicy.WAIT_UNTIL) + .build(); + elasticsearchTemplate.bulkOperation(batch, bulkOptions, IndexCoordinates.of(indexName)); + log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); + resetTimer(); + } + } + + /** + * Resets the timer for executing the flush operation. This ensures + * that the flush operation is delayed until the configured time interval + * has elapsed from the last push to the queue. + */ + private void resetTimer() { + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } + ScheduledFuture newTask = scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); + atomicDelayer.set(newTask); + } + +} diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java index 661cd3868ca..cfb36a8446d 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java @@ -13,7 +13,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.*; @NoArgsConstructor -@Document(indexName = "#{@elasticCaseIndex}") +@Document(indexName = "#{@elasticCaseIndex}", createIndex = false) public class ElasticCase extends com.netgrif.application.engine.objects.elastic.domain.ElasticCase { public ElasticCase(Case useCase) { diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java index 5794c8a52dc..33481270767 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java @@ -17,7 +17,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.Keyword; @NoArgsConstructor -@Document(indexName = "#{@elasticPetriNetIndex}") +@Document(indexName = "#{@elasticPetriNetIndex}", createIndex = false) public class ElasticPetriNet extends com.netgrif.application.engine.objects.elastic.domain.ElasticPetriNet { public ElasticPetriNet(PetriNet net) { diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java index cb988a814d5..3752616217e 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java @@ -16,7 +16,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.Keyword; @NoArgsConstructor -@Document(indexName = "#{@elasticTaskIndex}") +@Document(indexName = "#{@elasticTaskIndex}", createIndex = false) public class ElasticTask extends com.netgrif.application.engine.objects.elastic.domain.ElasticTask { public ElasticTask(Task task) { From 9b7219500cdce0171f7510b5326cf08c3f9cbc08 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 12:38:09 +0100 Subject: [PATCH 03/37] - corrected according to PR --- .../elastic/service/ElasticCaseService.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index d14670949ac..ca4672a3d5b 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -84,34 +84,27 @@ public ElasticCaseService(ElasticCaseRepository repository, @Override public void remove(String caseId) { caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("id").is(caseId)).build()).build()); - log.info("[{}]: Case \"{}\" deleted", caseId, caseId); + log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId); } @Override public void removeByPetriNetId(String processId) { caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("processId").is(processId)).build()).build()); - log.info("[{}]: All cases of Petri Net with id \"{}\" deleted", processId, processId); + log.info("[{}]: All cases of Petri Net with id \"{}\" are queued for deletion", processId, processId); } @Override public void index(ElasticCase useCase) { - try { - Optional elasticCaseOptional = repository.findById(useCase.getId()); - if (elasticCaseOptional.isEmpty()) { - caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(useCase).build()); - } else { - com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); - elasticCase.update(useCase); - caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(elasticCase).build()); - } - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); - publisher.publishEvent(new IndexCaseEvent(useCase)); - } catch (InvalidDataAccessApiUsageException ignored) { - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed"); - repository.deleteAllById(useCase.getId()); - repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); + Optional elasticCaseOptional = repository.findById(useCase.getId()); + if (elasticCaseOptional.isEmpty()) { + caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(useCase).build()); + } else { + com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); + elasticCase.update(useCase); + caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(elasticCase).build()); } + log.debug("[{}]: Case \"{}\" indexed", useCase.getId(), useCase.getTitle()); + publisher.publishEvent(new IndexCaseEvent(useCase)); } @Override From d7a57d7226c14a2eddd7a33d8c1687b9e97e34b6 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 12:53:30 +0100 Subject: [PATCH 04/37] Fix empty batch handling and enhance error recovery Add a return statement for empty batches to prevent unnecessary processing. Implement error handling to requeue failed batches and log the exception details. Prevent scheduling new tasks when the scheduler is shut down. --- .../engine/elastic/service/ElasticQueueManager.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 5f0e35f597a..4ff8f36d122 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -92,7 +92,10 @@ private synchronized void flush() { } String uuid = UUID.randomUUID().toString(); - if (!batch.isEmpty()) { + if (batch.isEmpty()) { + return; + } + try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); BulkOptions bulkOptions = BulkOptions.builder() .withRefreshPolicy(RefreshPolicy.WAIT_UNTIL) @@ -100,6 +103,9 @@ private synchronized void flush() { elasticsearchTemplate.bulkOperation(batch, bulkOptions, IndexCoordinates.of(indexName)); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); resetTimer(); + } catch (Exception e) { + queue.addAll(batch); + log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); } } @@ -113,6 +119,9 @@ private void resetTimer() { if (delayer != null) { delayer.cancel(false); } + if (scheduler.isShutdown()) { + return; + } ScheduledFuture newTask = scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); atomicDelayer.set(newTask); } From f30179e1397661551e858413cd7b332fccd5f41f Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 13:14:01 +0100 Subject: [PATCH 05/37] Update log message to indicate case is queued for indexing Previously, the log message implied immediate indexing, which was misleading. The updated message correctly reflects that the case is added to the indexing queue. This improves clarity and aligns with the actual behavior of the method. --- .../application/engine/elastic/service/ElasticCaseService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index ca4672a3d5b..c22d2978fd3 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -103,7 +103,7 @@ public void index(ElasticCase useCase) { elasticCase.update(useCase); caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(elasticCase).build()); } - log.debug("[{}]: Case \"{}\" indexed", useCase.getId(), useCase.getTitle()); + log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle()); publisher.publishEvent(new IndexCaseEvent(useCase)); } From 653829ade00e13ae185ad6d3e16d3187f200ee0d Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 13:31:01 +0100 Subject: [PATCH 06/37] Improve shutdown logic in ElasticQueueManager Added cancellation of the atomicDelayer to ensure proper task cleanup during shutdown. Implemented a timeout-based termination for the scheduler, with fallback to forced shutdown to handle interruptions and prevent potential resource leaks. --- .../engine/elastic/service/ElasticQueueManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 4ff8f36d122..7a3867d0b38 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -64,8 +64,18 @@ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties e */ @PreDestroy public void shutdown() { - scheduler.shutdown(); + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } flush(); + try { + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } } From 1946903c9ca783336ebf757ba48bbeb5fb9d20bb Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 13:45:20 +0100 Subject: [PATCH 07/37] Simplify ElasticQueueManager shutdown process Replaced complex termination logic with a single `scheduler.shutdown()` call. This improves code readability and maintains the same functionality while reducing potential error points in the shutdown process. --- .../engine/elastic/service/ElasticQueueManager.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 7a3867d0b38..b05f4fbc7a3 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -64,18 +64,8 @@ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties e */ @PreDestroy public void shutdown() { - ScheduledFuture delayer = atomicDelayer.getAndSet(null); - if (delayer != null) { - delayer.cancel(false); - } flush(); - try { - if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - } + scheduler.shutdown(); } From 21f654915205d8e8ee8ffe5588be8cff248e4f6f Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 13:46:49 +0100 Subject: [PATCH 08/37] Refactor shutdown logic in ElasticQueueManager Revised the shutdown method to ensure proper handling of queued tasks and prevent potential delays. Added cancellation of the atomic delayer and improved scheduler termination logic with a timeout and fallback to immediate shutdown. This enhances system stability and resource cleanup reliability. --- .../elastic/service/ElasticQueueManager.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index b05f4fbc7a3..36ec9c34825 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -64,8 +64,20 @@ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties e */ @PreDestroy public void shutdown() { - flush(); - scheduler.shutdown(); + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } + while (!queue.isEmpty()) { + flush(); + } + try { + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } } From b0da37db7f1db6b412961ce339c939b9e23fc9c2 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Tue, 4 Nov 2025 14:15:40 +0100 Subject: [PATCH 09/37] Reduce default thread pool size for scheduled executor Lowered the default scheduledExecutorPoolSize from 50 to 10. This change optimizes resource utilization and prevents over-allocation of threads, improving system performance under typical workloads. --- .../configuration/properties/DataConfigurationProperties.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 0cfbb11618f..08edae84a26 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -661,9 +661,9 @@ public static class QueueProperties { /** * The size of the thread pool for the scheduled executor service. * This determines the number of threads available to schedule and execute tasks. - * Default value: 50. + * Default value: 10. */ - private int scheduledExecutorPoolSize = 50; + private int scheduledExecutorPoolSize = 10; /** * Delay time between flush operations in the queue. From 4fa39886f69b3f75909c8ca2aa6c1cad5a779637 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Wed, 5 Nov 2025 09:59:04 +0100 Subject: [PATCH 10/37] Add delay configuration to Elasticsearch queue in tests Introduce a new `queue.delay` parameter in the Elasticsearch configuration within `application-test.yaml`. This ensures better control over queue processing behavior during test execution. --- application-engine/src/test/resources/application-test.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/application-engine/src/test/resources/application-test.yaml b/application-engine/src/test/resources/application-test.yaml index 78636b7143e..57e200d3ec4 100644 --- a/application-engine/src/test/resources/application-test.yaml +++ b/application-engine/src/test/resources/application-test.yaml @@ -7,6 +7,8 @@ netgrif: mongodb: drop: true elasticsearch: + queue: + delay: 10 drop: true index: petriNet: ${NETGRIF_ENGINE_DATA_DATABASE_NAME:nae}_test_petrinet From 7a1fb84503d0887c7b02b7175384337a5e4bcf8d Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Wed, 5 Nov 2025 15:28:06 +0100 Subject: [PATCH 11/37] Add validation annotations and RefreshPolicy to queue config Introduced validation annotations (@Valid and @Min) for queue-related properties to enforce valid configurations. Added a RefreshPolicy property with a default value of NONE to enhance control over Elasticsearch refresh behavior. These changes improve reliability and configurability of the queue settings. --- .../properties/DataConfigurationProperties.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 08edae84a26..91c926578a2 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -15,6 +15,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.Resource; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -529,6 +530,7 @@ public static class ElasticsearchProperties { * including the maximum queue size, delay between flush operations, * and the thread pool size for scheduled executor service tasks. */ + @Valid private QueueProperties queue = new QueueProperties(); public static final String PETRI_NET_INDEX = "petriNet"; @@ -663,6 +665,7 @@ public static class QueueProperties { * This determines the number of threads available to schedule and execute tasks. * Default value: 10. */ + @Min(1) private int scheduledExecutorPoolSize = 10; /** @@ -670,6 +673,7 @@ public static class QueueProperties { * This value represents the amount of time the scheduler waits before initiating the next flush. * Default value: 150. */ + @Min(1) private int delay = 150; /** @@ -683,7 +687,10 @@ public static class QueueProperties { * When the queue size reaches this limit, it triggers a flush operation. * Default value: 400. */ + @Min(1) private int maxQueueSize = 400; + + private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; } } From 625fee3cba22b232ec9fc4f096f1b1771d41b0f6 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 08:57:10 +0100 Subject: [PATCH 12/37] Refactor Elasticsearch integration to use bulk operations Replaced ElasticsearchTemplate with ElasticsearchClient for bulk operations, improving efficiency and consistency. Updated ElasticQueueManager and related services to handle BulkOperation objects, eliminating deprecated methods and simplifying queue handling logic. Adjusted indexing and deletion logic to align with the new approach. --- .../ElasticServiceConfiguration.java | 8 +++- .../elastic/service/ElasticCaseService.java | 28 ++++++++---- .../elastic/service/ElasticQueueManager.java | 45 +++++++++---------- .../objects/elastic/domain/ElasticCase.java | 2 + .../spring/elastic/domain/ElasticCase.java | 5 +++ 5 files changed, 55 insertions(+), 33 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java index 07c23d8d06c..2665bfae1e9 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java @@ -1,5 +1,6 @@ package com.netgrif.application.engine.configuration; +import co.elastic.clients.elasticsearch.ElasticsearchClient; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; import com.netgrif.application.engine.elastic.domain.ElasticTaskRepository; @@ -37,6 +38,7 @@ public class ElasticServiceConfiguration { private final IWorkflowService workflowService; private final IElasticCasePrioritySearch elasticCasePrioritySearch; private final ApplicationEventPublisher applicationEventPublisher; + private final ElasticsearchClient elasticsearchClient; @Bean @Primary @@ -65,7 +67,8 @@ public IElasticCaseService elasticCaseService() { petriNetService, workflowService, elasticCasePrioritySearch, - applicationEventPublisher + applicationEventPublisher, + elasticsearchClient ); } @@ -85,7 +88,8 @@ public IElasticCaseService reindexingTaskElasticCaseService() { petriNetService, workflowService, elasticCasePrioritySearch, - applicationEventPublisher + applicationEventPublisher, + elasticsearchClient ); } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index c22d2978fd3..a176c90e238 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -1,11 +1,13 @@ package com.netgrif.application.engine.elastic.service; +import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.FieldValue; import co.elastic.clients.elasticsearch._types.mapping.FieldType; import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery; import co.elastic.clients.elasticsearch._types.query_dsl.TermsQueryField; import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import com.netgrif.application.engine.objects.auth.domain.LoggedUser; import com.netgrif.application.engine.objects.elastic.domain.ElasticCase; @@ -26,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; -import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.*; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; import org.springframework.data.elasticsearch.client.elc.NativeQuery; @@ -68,7 +69,8 @@ public ElasticCaseService(ElasticCaseRepository repository, @Lazy IPetriNetService petriNetService, @Lazy IWorkflowService workflowService, IElasticCasePrioritySearch iElasticCasePrioritySearch, - ApplicationEventPublisher publisher) { + ApplicationEventPublisher publisher, + ElasticsearchClient elasticsearchClient) { this.repository = repository; this.template = template; this.executors = executors; @@ -77,19 +79,21 @@ public ElasticCaseService(ElasticCaseRepository repository, this.workflowService = workflowService; this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; this.publisher = publisher; - this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)); - this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)); + this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX), elasticsearchClient); + this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX), elasticsearchClient); } @Override public void remove(String caseId) { - caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("id").is(caseId)).build()).build()); + caseElasticDeleteQueueManager.push(BulkOperation.of(op -> op.delete(d -> d.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)).id(caseId)))); log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId); } @Override public void removeByPetriNetId(String processId) { - caseElasticDeleteQueueManager.push(DeleteQuery.builder(CriteriaQuery.builder(Criteria.where("processId").is(processId)).build()).build()); + CriteriaQuery query = CriteriaQuery.builder(Criteria.where("processId").is(processId)).build(); + SearchHits hits = template.search(query, ElasticCase.class, IndexCoordinates.of(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX))); + hits.stream().forEach(hit -> remove(hit.getId())); log.info("[{}]: All cases of Petri Net with id \"{}\" are queued for deletion", processId, processId); } @@ -97,11 +101,19 @@ public void removeByPetriNetId(String processId) { public void index(ElasticCase useCase) { Optional elasticCaseOptional = repository.findById(useCase.getId()); if (elasticCaseOptional.isEmpty()) { - caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(useCase).build()); + caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i + .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) + .id(useCase.getId()) + .document(useCase)) + )); } else { com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); elasticCase.update(useCase); - caseElasticIndexQueueManager.push(new IndexQueryBuilder().withObject(elasticCase).build()); + caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i + .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) + .id(elasticCase.getId()) + .document(elasticCase)) + )); } log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle()); publisher.publishEvent(new IndexCaseEvent(useCase)); diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 36ec9c34825..a120c8f9941 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -1,11 +1,14 @@ package com.netgrif.application.engine.elastic.service; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.Refresh; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; -import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; @@ -28,9 +31,7 @@ public final class ElasticQueueManager { private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); - private final String indexName; - - private final Queue queue; + private final Queue queue; private final ScheduledExecutorService scheduler; @@ -38,7 +39,8 @@ public final class ElasticQueueManager { private final DataConfigurationProperties.ElasticsearchProperties.QueueProperties queueProperties; - private final ElasticsearchTemplate elasticsearchTemplate; + + private final ElasticsearchClient elasticsearchClient; /** * Constructs an ElasticQueueManager instance. @@ -49,13 +51,13 @@ public final class ElasticQueueManager { */ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, ElasticsearchTemplate elasticsearchTemplate, - String indexName) { + String indexName, + ElasticsearchClient elasticsearchClient) { queue = new ConcurrentLinkedDeque<>(); atomicDelayer = new AtomicReference<>(); queueProperties = elasticsearchProperties.getQueue(); scheduler = Executors.newScheduledThreadPool(queueProperties.getScheduledExecutorPoolSize()); - this.elasticsearchTemplate = elasticsearchTemplate; - this.indexName = indexName; + this.elasticsearchClient = elasticsearchClient; } /** @@ -86,7 +88,7 @@ public void shutdown() { * * @param elasticQuery the elastic query to be added to the queue */ - public void push(E elasticQuery) { + public void push(BulkOperation elasticQuery) { queue.add(elasticQuery); resetTimer(); } @@ -97,22 +99,19 @@ public void push(E elasticQuery) { * processed at a time. */ private synchronized void flush() { - List batch = new ArrayList<>(); + if (queue.isEmpty()) { + return; + } + List batch = new ArrayList<>(); while (!queue.isEmpty() && batch.size() < queueProperties.getMaxQueueSize()) { batch.add(queue.poll()); } String uuid = UUID.randomUUID().toString(); - if (batch.isEmpty()) { - return; - } try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); - BulkOptions bulkOptions = BulkOptions.builder() - .withRefreshPolicy(RefreshPolicy.WAIT_UNTIL) - .build(); - elasticsearchTemplate.bulkOperation(batch, bulkOptions, IndexCoordinates.of(indexName)); + elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); resetTimer(); } catch (Exception e) { @@ -127,15 +126,15 @@ private synchronized void flush() { * has elapsed from the last push to the queue. */ private void resetTimer() { - ScheduledFuture delayer = atomicDelayer.getAndSet(null); - if (delayer != null) { - delayer.cancel(false); - } if (scheduler.isShutdown()) { return; } - ScheduledFuture newTask = scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); - atomicDelayer.set(newTask); + atomicDelayer.updateAndGet(existing -> { + if (existing != null && !existing.isDone() && !existing.isCancelled()) { + return existing; + } + return scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); + }); } } diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java index c16316c9671..fead1ecca42 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java @@ -23,6 +23,8 @@ public abstract class ElasticCase implements Serializable { @Serial private static final long serialVersionUID = 7536959921044863265L; + private String _class; + private String id; private Long version; diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java index cfb36a8446d..670a88dda37 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java @@ -142,4 +142,9 @@ public Set getNegativeViewRoles() { public Set getNegativeViewUsers() { return super.getNegativeViewUsers(); } + + @Field(type = Keyword) + public String get_class() { + return this.getClass().getName(); + } } From 6c2140998090c77c922438b1cbea974fbf1b9000 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 10:20:45 +0100 Subject: [PATCH 13/37] Refactor ElasticQueueManager and clean up unused dependencies. Removed unnecessary parameters from ElasticQueueManager constructor and updated related instantiations. Simplified timer reset logic, added scheduler shutdown, and cleaned up unused imports and repositories in ElasticServiceConfiguration. --- .../ElasticServiceConfiguration.java | 2 -- .../elastic/service/ElasticCaseService.java | 4 ++-- .../elastic/service/ElasticQueueManager.java | 24 ++++++++----------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java index 2665bfae1e9..dab7c287a52 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java @@ -19,7 +19,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration @RequiredArgsConstructor @@ -31,7 +30,6 @@ public class ElasticServiceConfiguration { private final ElasticCaseRepository caseRepository; - private final ElasticTaskRepository taskRepository; private final ElasticsearchTemplate elasticsearchTemplate; private final DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties; private final IPetriNetService petriNetService; diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index a176c90e238..a78c3420af5 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -79,8 +79,8 @@ public ElasticCaseService(ElasticCaseRepository repository, this.workflowService = workflowService; this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; this.publisher = publisher; - this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX), elasticsearchClient); - this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, template, elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX), elasticsearchClient); + this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); + this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); } @Override diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index a120c8f9941..2ca91ae8bc5 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -8,9 +8,6 @@ import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; -import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.BulkOptions; import java.util.ArrayList; import java.util.List; @@ -46,12 +43,8 @@ public final class ElasticQueueManager { * Constructs an ElasticQueueManager instance. * * @param elasticsearchProperties the configuration properties for Elasticsearch, including queue parameters - * @param elasticsearchTemplate the template for interacting with Elasticsearch - * @param indexName the name of the Elasticsearch index */ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, - ElasticsearchTemplate elasticsearchTemplate, - String indexName, ElasticsearchClient elasticsearchClient) { queue = new ConcurrentLinkedDeque<>(); atomicDelayer = new AtomicReference<>(); @@ -70,6 +63,7 @@ public void shutdown() { if (delayer != null) { delayer.cancel(false); } + scheduler.shutdown(); while (!queue.isEmpty()) { flush(); } @@ -90,7 +84,9 @@ public void shutdown() { */ public void push(BulkOperation elasticQuery) { queue.add(elasticQuery); - resetTimer(); + if (queue.size() <= queueProperties.getMaxQueueSize()) { + resetTimer(); + } } /** @@ -129,12 +125,12 @@ private void resetTimer() { if (scheduler.isShutdown()) { return; } - atomicDelayer.updateAndGet(existing -> { - if (existing != null && !existing.isDone() && !existing.isCancelled()) { - return existing; - } - return scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); - }); + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } + ScheduledFuture newTask = scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); + atomicDelayer.set(newTask); } } From 4f741953781fae9396ea5dc3d7371c629b9d29c1 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 10:23:38 +0100 Subject: [PATCH 14/37] Optimize queue handling in ElasticQueueManager. Added a condition to flush the queue when its size exceeds the maximum threshold, ensuring more efficient management. The timer reset now occurs only if the flush is not triggered, improving resource utilization and robustness. --- .../engine/elastic/service/ElasticQueueManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 2ca91ae8bc5..f51f039ae29 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -109,7 +109,11 @@ private synchronized void flush() { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); - resetTimer(); + if (queue.size() >= queueProperties.getMaxQueueSize()) { + flush(); + } else { + resetTimer(); + } } catch (Exception e) { queue.addAll(batch); log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); From 026a96eacbb7224dbe2e1141f3337bd79ffceeb6 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 10:53:38 +0100 Subject: [PATCH 15/37] Add Serializable and _class handling to ElasticTask Implemented Serializable in ElasticTask with a serialVersionUID to ensure class compatibility during serialization. Introduced the _class field and getter method to store and retrieve the task's class name, improving type recognition. --- .../engine/objects/elastic/domain/ElasticTask.java | 9 ++++++++- .../adapter/spring/elastic/domain/ElasticTask.java | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java index 6b36ce07c48..985eb7edd3c 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java @@ -8,6 +8,8 @@ import com.netgrif.application.engine.objects.workflow.domain.Task; import lombok.*; +import java.io.Serial; +import java.io.Serializable; import java.time.LocalDateTime; import java.util.HashMap; import java.util.HashSet; @@ -17,7 +19,12 @@ @Data @NoArgsConstructor @AllArgsConstructor -public abstract class ElasticTask { +public abstract class ElasticTask implements Serializable { + + @Serial + private static final long serialVersionUID = 8399390623172906801L; + + private String _class; private String id; diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java index 3752616217e..7161a73895c 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java @@ -155,4 +155,10 @@ public String getDataFocusPolicy() { public String getFinishPolicy() { return super.getFinishPolicy(); } + + @Field(type = Keyword) + @Override + public String get_class() { + return this.getClass().getName(); + } } From da9180489a80a74cbd7efacde113eb9005c03ffa Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 12:17:51 +0100 Subject: [PATCH 16/37] Refactor Elastic domain models and improve queue management. Removed unused `_class` field from Elastic models and updated queue shutdown logic with `@PreDestroy`. Modified ElasticCaseService to use ElasticsearchConverter for mapping, ensuring better compatibility and maintainability. --- .../engine/elastic/service/ElasticCaseService.java | 14 ++++++++++++-- .../elastic/service/ElasticQueueManager.java | 4 ++-- .../engine/objects/elastic/domain/ElasticCase.java | 2 -- .../engine/objects/elastic/domain/ElasticTask.java | 2 +- .../adapter/spring/elastic/domain/ElasticCase.java | 5 ----- .../adapter/spring/elastic/domain/ElasticTask.java | 10 +++++----- 6 files changed, 20 insertions(+), 17 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index a78c3420af5..9bb0afa0464 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -24,6 +24,7 @@ import com.netgrif.application.engine.utils.FullPageRequest; import com.netgrif.application.engine.objects.workflow.domain.Case; import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; +import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; @@ -34,6 +35,7 @@ import org.springframework.data.elasticsearch.client.elc.NativeQueryBuilder; import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.stereotype.Service; @@ -81,6 +83,14 @@ public ElasticCaseService(ElasticCaseRepository repository, this.publisher = publisher; this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); + + } + + @PreDestroy + private void stopQueues() { + caseElasticIndexQueueManager.shutdown(); + caseElasticDeleteQueueManager.shutdown(); + log.info("Queues for cases have been stopped"); } @Override @@ -104,7 +114,7 @@ public void index(ElasticCase useCase) { caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) .id(useCase.getId()) - .document(useCase)) + .document(template.getElasticsearchConverter().mapObject(useCase))) )); } else { com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); @@ -112,7 +122,7 @@ public void index(ElasticCase useCase) { caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) .id(elasticCase.getId()) - .document(elasticCase)) + .document(template.getElasticsearchConverter().mapObject(elasticCase))) )); } log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle()); diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index f51f039ae29..7f2557b3c64 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -8,6 +8,7 @@ import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import java.util.ArrayList; import java.util.List; @@ -36,7 +37,6 @@ public final class ElasticQueueManager { private final DataConfigurationProperties.ElasticsearchProperties.QueueProperties queueProperties; - private final ElasticsearchClient elasticsearchClient; /** @@ -84,7 +84,7 @@ public void shutdown() { */ public void push(BulkOperation elasticQuery) { queue.add(elasticQuery); - if (queue.size() <= queueProperties.getMaxQueueSize()) { + if (queue.size() < queueProperties.getMaxQueueSize()) { resetTimer(); } } diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java index fead1ecca42..c16316c9671 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticCase.java @@ -23,8 +23,6 @@ public abstract class ElasticCase implements Serializable { @Serial private static final long serialVersionUID = 7536959921044863265L; - private String _class; - private String id; private Long version; diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java index 985eb7edd3c..e12cd0198f5 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java @@ -24,7 +24,7 @@ public abstract class ElasticTask implements Serializable { @Serial private static final long serialVersionUID = 8399390623172906801L; - private String _class; +// private String _class; private String id; diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java index 670a88dda37..cfb36a8446d 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java @@ -142,9 +142,4 @@ public Set getNegativeViewRoles() { public Set getNegativeViewUsers() { return super.getNegativeViewUsers(); } - - @Field(type = Keyword) - public String get_class() { - return this.getClass().getName(); - } } diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java index 7161a73895c..de6a8002716 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java @@ -156,9 +156,9 @@ public String getFinishPolicy() { return super.getFinishPolicy(); } - @Field(type = Keyword) - @Override - public String get_class() { - return this.getClass().getName(); - } +// @Field(type = Keyword) +// @Override +// public String get_class() { +// return this.getClass().getName(); +// } } From 8d584a23e0d9323d8b90b1cdf329647fdaaeeb0a Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 12:18:40 +0100 Subject: [PATCH 17/37] Remove generic type parameter from ElasticQueueManager The ElasticQueueManager class no longer uses a generic type parameter, simplifying its definition. This change also updates its usage across ElasticCaseService with non-generic instantiations, improving code maintainability and reducing unnecessary complexity. --- .../engine/elastic/service/ElasticCaseService.java | 8 ++++---- .../engine/elastic/service/ElasticQueueManager.java | 7 ++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index 9bb0afa0464..fc811665281 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -61,8 +61,8 @@ public class ElasticCaseService extends ElasticViewPermissionService implements protected IWorkflowService workflowService; protected IElasticCasePrioritySearch iElasticCasePrioritySearch; protected ApplicationEventPublisher publisher; - protected ElasticQueueManager caseElasticIndexQueueManager; - protected ElasticQueueManager caseElasticDeleteQueueManager; + protected ElasticQueueManager caseElasticIndexQueueManager; + protected ElasticQueueManager caseElasticDeleteQueueManager; public ElasticCaseService(ElasticCaseRepository repository, ElasticsearchTemplate template, @@ -81,8 +81,8 @@ public ElasticCaseService(ElasticCaseRepository repository, this.workflowService = workflowService; this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; this.publisher = publisher; - this.caseElasticIndexQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); - this.caseElasticDeleteQueueManager = new ElasticQueueManager<>(elasticProperties, elasticsearchClient); + this.caseElasticIndexQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient); + this.caseElasticDeleteQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient); } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 7f2557b3c64..f80731fbab5 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -8,7 +8,6 @@ import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import java.util.ArrayList; import java.util.List; @@ -22,10 +21,8 @@ * Manages a queue of elastic queries and handles their periodic bulk processing in Elasticsearch. * This class efficiently buffers and schedules elastic queries in batches, ensuring timely * indexing into the Elasticsearch index with controlled execution and concurrency. - * - * @param the type of elements to be managed in the queue and indexed by Elasticsearch - */ -public final class ElasticQueueManager { + **/ +public final class ElasticQueueManager { private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); From 3cd8144990ca65c8b96d21bf58292507910fd783 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 12:19:17 +0100 Subject: [PATCH 18/37] Remove unused ElasticsearchConverter import ElasticsearchConverter was imported but never used in the code. Removing it improves code clarity and eliminates unnecessary dependencies. --- .../application/engine/elastic/service/ElasticCaseService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index fc811665281..96b33361eac 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -35,7 +35,6 @@ import org.springframework.data.elasticsearch.client.elc.NativeQueryBuilder; import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; -import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.stereotype.Service; From 8bab0c2fdf26c08dca3fd25fb0e51f28c5fa5328 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 12:27:29 +0100 Subject: [PATCH 19/37] Refactor case deletion logic in CaseEventHandler Simplify the case deletion flow by removing unnecessary null checks and redundant exception handling. This enhances code readability and ensures a more consistent execution path. --- .../engine/workflow/service/CaseEventHandler.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java b/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java index 44e39a0a236..70505e4c9a2 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java @@ -58,17 +58,6 @@ public void onAfterDelete(AfterDeleteEvent event) { } String objectId = ((Document)document.get("_id")).get("shortProcessId") + "-" + ((Document)document.get("_id")).get("objectId").toString(); - if (objectId != null) { - service.remove(objectId); - return; - } - - objectId = document.getObjectId("petriNetObjectId").toString(); - if (objectId != null) { - service.removeByPetriNetId(objectId); - return; - } - - throw new IllegalStateException("Case has been deleted neither by ID nor by process ID!"); + service.remove(objectId); } } From 20b69ba39af853dba02520bf8d1d1c30b8b427b3 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 12:28:02 +0100 Subject: [PATCH 20/37] Remove unused removeByPetriNetId method from ElasticCaseService The removeByPetriNetId method and its interface declaration were deleted as they are no longer required. This cleanup improves code clarity and removes unnecessary functionality, ensuring better maintainability. --- .../engine/elastic/service/ElasticCaseService.java | 8 -------- .../elastic/service/interfaces/IElasticCaseService.java | 2 -- 2 files changed, 10 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index 96b33361eac..6b5de9b3d4f 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -98,14 +98,6 @@ public void remove(String caseId) { log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId); } - @Override - public void removeByPetriNetId(String processId) { - CriteriaQuery query = CriteriaQuery.builder(Criteria.where("processId").is(processId)).build(); - SearchHits hits = template.search(query, ElasticCase.class, IndexCoordinates.of(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX))); - hits.stream().forEach(hit -> remove(hit.getId())); - log.info("[{}]: All cases of Petri Net with id \"{}\" are queued for deletion", processId, processId); - } - @Override public void index(ElasticCase useCase) { Optional elasticCaseOptional = repository.findById(useCase.getId()); diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java index 8750dff1391..bf3da6f70ba 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java @@ -23,6 +23,4 @@ public interface IElasticCaseService { long count(List requests, LoggedUser user, Locale locale, Boolean isIntersection); void remove(String caseId); - - void removeByPetriNetId(String processId); } From d01bb7e968a4441958596e0de9dd7237421b782d Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 13:25:22 +0100 Subject: [PATCH 21/37] Refactor document mapping in ElasticIndexService. Replaced direct document passing with mapped objects using ElasticsearchConverter. This ensures proper handling and conversion of objects before indexing or upserting them into Elasticsearch. --- .../engine/elastic/service/ElasticIndexService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java index 05c4fbc53ef..3cb4d15a1c6 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java @@ -452,7 +452,7 @@ private void prepareCaseBulkOperation(ElasticCase doc, List opera .index(elasticsearchProperties.getIndex().get("case")) .id(doc.getId()) .action(a -> a - .doc(doc) + .doc(elasticsearchTemplate.getElasticsearchConverter().mapObject(doc)) .docAsUpsert(true) ) ))); @@ -474,7 +474,7 @@ private void prepareTaskBulkOperation(ElasticTask doc, List opera .index(elasticsearchProperties.getIndex().get("task")) .id(doc.getId()) .action(a -> a - .doc(doc) + .doc(elasticsearchTemplate.getElasticsearchConverter().mapObject(doc)) .docAsUpsert(true) ) )) From 64b749f0b795e4e11c9f79c8528a3f80299a7fd1 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 14:10:23 +0100 Subject: [PATCH 22/37] Refactor ElasticQueueManager and clean up ElasticTask code. Added handling for `BulkResponse` in ElasticQueueManager to allow future response processing. Removed unused/commented code in ElasticTask classes to improve code clarity and maintainability. --- .../engine/elastic/service/ElasticQueueManager.java | 3 ++- .../engine/objects/elastic/domain/ElasticTask.java | 1 - .../engine/adapter/spring/elastic/domain/ElasticTask.java | 6 ------ 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index f80731fbab5..87b4d2c1968 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -3,6 +3,7 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.Refresh; import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import jakarta.annotation.PreDestroy; @@ -104,7 +105,7 @@ private synchronized void flush() { String uuid = UUID.randomUUID().toString(); try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); - elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); + BulkResponse bulkResponse = elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); if (queue.size() >= queueProperties.getMaxQueueSize()) { flush(); diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java index e12cd0198f5..db7babc19e7 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java @@ -24,7 +24,6 @@ public abstract class ElasticTask implements Serializable { @Serial private static final long serialVersionUID = 8399390623172906801L; -// private String _class; private String id; diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java index de6a8002716..3752616217e 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java @@ -155,10 +155,4 @@ public String getDataFocusPolicy() { public String getFinishPolicy() { return super.getFinishPolicy(); } - -// @Field(type = Keyword) -// @Override -// public String get_class() { -// return this.getClass().getName(); -// } } From 838a277b70922268e055f1f22b12eae473447b9f Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 14:11:18 +0100 Subject: [PATCH 23/37] Remove unused variable in ElasticQueueManager Removed the unused `BulkResponse bulkResponse` variable to clean up the code and avoid potential confusion. This change does not impact functionality but improves code readability and maintenance. --- .../application/engine/elastic/service/ElasticQueueManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 87b4d2c1968..6d5e87f1e44 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -105,7 +105,7 @@ private synchronized void flush() { String uuid = UUID.randomUUID().toString(); try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); - BulkResponse bulkResponse = elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); + elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); if (queue.size() >= queueProperties.getMaxQueueSize()) { flush(); From 83b7b19756d2b2ee12c69d73553667fd0cf0b2b4 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 14:35:22 +0100 Subject: [PATCH 24/37] Fix timer reset on index failure in ElasticQueueManager Ensure the timer is reset when an indexing batch fails to prevent potential delays in subsequent retries. This change enhances the robustness of the error handling logic. --- .../application/engine/elastic/service/ElasticQueueManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 6d5e87f1e44..9767bb625b5 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -114,6 +114,7 @@ private synchronized void flush() { } } catch (Exception e) { queue.addAll(batch); + resetTimer(); log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); } } From f702a3cd15902b6dc5e4a164890bf781756c63fd Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Thu, 6 Nov 2025 15:17:19 +0100 Subject: [PATCH 25/37] Refactor ElasticQueueManager to include event publishing Replaced `BulkOperation` with a new `BulkOperationWrapper` to allow event publishing after processing batches. Enhanced `ElasticQueueManager` to support an `ApplicationEventPublisher` for this purpose. Adjusted related services to accommodate the changes and improve code modularity. --- .../elastic/domain/BulkOperationWrapper.java | 19 +++++++ .../elastic/service/ElasticCaseService.java | 36 ++++++++------ .../elastic/service/ElasticQueueManager.java | 49 ++++++++++--------- 3 files changed, 68 insertions(+), 36 deletions(-) create mode 100644 application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java new file mode 100644 index 00000000000..1cce05d15e0 --- /dev/null +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java @@ -0,0 +1,19 @@ +package com.netgrif.application.engine.elastic.domain; + + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.EventObject; + +@Data +@Builder +@AllArgsConstructor +public class BulkOperationWrapper { + + private BulkOperation operation; + + private EventObject publishableEvent; +} diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index 6b5de9b3d4f..f7aba77b5f7 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -9,6 +9,7 @@ import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; +import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper; import com.netgrif.application.engine.objects.auth.domain.LoggedUser; import com.netgrif.application.engine.objects.elastic.domain.ElasticCase; import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; @@ -80,8 +81,8 @@ public ElasticCaseService(ElasticCaseRepository repository, this.workflowService = workflowService; this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; this.publisher = publisher; - this.caseElasticIndexQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient); - this.caseElasticDeleteQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient); + this.caseElasticIndexQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher); + this.caseElasticDeleteQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher); } @@ -94,7 +95,10 @@ private void stopQueues() { @Override public void remove(String caseId) { - caseElasticDeleteQueueManager.push(BulkOperation.of(op -> op.delete(d -> d.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)).id(caseId)))); + caseElasticDeleteQueueManager.push(new BulkOperationWrapper( + BulkOperation.of(op -> op.delete(d -> d.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)).id(caseId))), + null + )); log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId); } @@ -102,22 +106,19 @@ public void remove(String caseId) { public void index(ElasticCase useCase) { Optional elasticCaseOptional = repository.findById(useCase.getId()); if (elasticCaseOptional.isEmpty()) { - caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i - .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) - .id(useCase.getId()) - .document(template.getElasticsearchConverter().mapObject(useCase))) - )); + caseElasticIndexQueueManager.push(BulkOperationWrapper.builder() + .operation(createIndexOperation(useCase)) + .publishableEvent(new IndexCaseEvent(useCase)) + .build()); } else { com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); elasticCase.update(useCase); - caseElasticIndexQueueManager.push(BulkOperation.of(op -> op.index(i -> i - .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) - .id(elasticCase.getId()) - .document(template.getElasticsearchConverter().mapObject(elasticCase))) - )); + caseElasticIndexQueueManager.push(BulkOperationWrapper.builder() + .operation(createIndexOperation(elasticCase)) + .publishableEvent(new IndexCaseEvent(elasticCase)) + .build()); } log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle()); - publisher.publishEvent(new IndexCaseEvent(useCase)); } @Override @@ -517,4 +518,11 @@ protected Pageable resolveUnmappedSortAttributes(Pageable pageable) { pageable.getSort().iterator().forEachRemaining(order -> modifiedOrders.add(new Order(order.getDirection(), order.getProperty()).withUnmappedType("keyword"))); return PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()).withSort(Sort.by(modifiedOrders)); } + + private BulkOperation createIndexOperation(ElasticCase useCase) { + return BulkOperation.of(op -> op.index(i -> i + .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) + .id(useCase.getId()) + .document(template.getElasticsearchConverter().mapObject(useCase)))); + } } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 9767bb625b5..21b538e314c 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -3,17 +3,14 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.Refresh; import co.elastic.clients.elasticsearch.core.BulkRequest; -import co.elastic.clients.elasticsearch.core.BulkResponse; -import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; +import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper; import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.UUID; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -27,7 +24,7 @@ public final class ElasticQueueManager { private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); - private final Queue queue; + private final Queue queue; private final ScheduledExecutorService scheduler; @@ -37,18 +34,22 @@ public final class ElasticQueueManager { private final ElasticsearchClient elasticsearchClient; + private final ApplicationEventPublisher eventPublisher; + /** * Constructs an ElasticQueueManager instance. * * @param elasticsearchProperties the configuration properties for Elasticsearch, including queue parameters */ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, - ElasticsearchClient elasticsearchClient) { + ElasticsearchClient elasticsearchClient, + ApplicationEventPublisher eventPublisher) { queue = new ConcurrentLinkedDeque<>(); atomicDelayer = new AtomicReference<>(); queueProperties = elasticsearchProperties.getQueue(); scheduler = Executors.newScheduledThreadPool(queueProperties.getScheduledExecutorPoolSize()); this.elasticsearchClient = elasticsearchClient; + this.eventPublisher = eventPublisher; } /** @@ -75,13 +76,8 @@ public void shutdown() { } - /** - * Adds an elastic query to the queue and resets the timer for the scheduled flush. - * - * @param elasticQuery the elastic query to be added to the queue - */ - public void push(BulkOperation elasticQuery) { - queue.add(elasticQuery); + public void push(BulkOperationWrapper operation) { + queue.add(operation); if (queue.size() < queueProperties.getMaxQueueSize()) { resetTimer(); } @@ -97,7 +93,7 @@ private synchronized void flush() { return; } - List batch = new ArrayList<>(); + List batch = new ArrayList<>(); while (!queue.isEmpty() && batch.size() < queueProperties.getMaxQueueSize()) { batch.add(queue.poll()); } @@ -105,13 +101,10 @@ private synchronized void flush() { String uuid = UUID.randomUUID().toString(); try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); - elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch).refresh(Refresh.False).build()); + elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch.stream().map(BulkOperationWrapper::getOperation).toList()).refresh(Refresh.False).build()); + publishEventsOfBatch(batch); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); - if (queue.size() >= queueProperties.getMaxQueueSize()) { - flush(); - } else { - resetTimer(); - } + checkQueue(); } catch (Exception e) { queue.addAll(batch); resetTimer(); @@ -136,4 +129,16 @@ private void resetTimer() { atomicDelayer.set(newTask); } + private void checkQueue() { + if (queue.size() >= queueProperties.getMaxQueueSize()) { + flush(); + } else { + resetTimer(); + } + } + + private void publishEventsOfBatch(List batch) { + batch.stream().filter(operationWrapper -> operationWrapper.getPublishableEvent() != null) + .forEach(operationWrapper -> eventPublisher.publishEvent(operationWrapper.getPublishableEvent())); + } } From e1fd1f2e34b70d536ca493457e955a4a2c4f2470 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 10:00:31 +0100 Subject: [PATCH 26/37] Synchronize resetTimer to ensure thread safety. The resetTimer method is now synchronized to handle potential concurrency issues and ensure thread safety. This change prevents race conditions when multiple threads access the method simultaneously. --- .../application/engine/elastic/service/ElasticQueueManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 21b538e314c..228efcbe3f4 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -117,7 +117,7 @@ private synchronized void flush() { * that the flush operation is delayed until the configured time interval * has elapsed from the last push to the queue. */ - private void resetTimer() { + private synchronized void resetTimer() { if (scheduler.isShutdown()) { return; } From 1e983f0032f9a354be07598cc77fb0bc2dbcc69c Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 10:30:48 +0100 Subject: [PATCH 27/37] Refactor ElasticQueueManager for improved queue handling Replaced `ConcurrentLinkedDeque` with `LinkedBlockingDeque` to enforce a bounded queue size, ensuring memory efficiency. Introduced separate properties for `maxQueueSize` and `maxBatchSize` to allow more precise control of queue management. Enhanced documentation and exception handling for better reliability and maintainability. --- .../DataConfigurationProperties.java | 20 +++- .../elastic/service/ElasticQueueManager.java | 91 +++++++++++++++---- 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 91c926578a2..9dc3a2d88fa 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -683,13 +683,27 @@ public static class QueueProperties { private TimeUnit delayUnit = TimeUnit.MILLISECONDS; /** - * Maximum number of elements allowed in the queue. - * When the queue size reaches this limit, it triggers a flush operation. + * Maximum number of elements allowed in batch to flush. + * When the batch size reaches this limit, it triggers a flush operation. * Default value: 400. */ @Min(1) - private int maxQueueSize = 400; + private int maxBatchSize = 400; + + /** + * Specifies the maximum size of the queue. When the queue reaches this size, + * a flush operation is triggered to process the elements in the queue. + * Default value is 3000, and the minimum allowable value is 400. + */ + @Min(400) + private int maxQueueSize = 3000; + + /** + * Defines the refresh policy for Elasticsearch operations. + * Determines when changes made by bulk operations will be visible for search. + * Default value is {@link RefreshPolicy#NONE}, meaning no immediate refresh. + */ private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; } } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 228efcbe3f4..7f173a01a3d 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -17,14 +17,26 @@ /** * Manages a queue of elastic queries and handles their periodic bulk processing in Elasticsearch. - * This class efficiently buffers and schedules elastic queries in batches, ensuring timely - * indexing into the Elasticsearch index with controlled execution and concurrency. + *

+ * This class is responsible for: + * - Enqueuing bulk operations and managing a bounded queue to ensure memory efficiency. + * - Automatically flushing queued operations into Elasticsearch in configurable batches. + * - Controlling execution with a scheduled timer to avoid overloading Elasticsearch. + * - Handling exceptions during the bulk operation, retrying in case of failures. + *

+ * Key components of this class: + * - {@code queue}: A thread-safe queue to hold bulk operations before processing. + * - {@code scheduler}: A scheduled executor service that manages timed execution of flush operations. + * - {@code atomicDelayer}: An atomic reference ensuring only one active scheduled task for the flush timer. + * - {@code queueProperties}: Configuration controlling maximum queue size, batch size, and flush delay parameters. + * - {@code elasticsearchClient}: The Elasticsearch client used for direct communication with the cluster. + * - {@code eventPublisher}: Publishes relevant events generated during bulk operation processing. **/ public final class ElasticQueueManager { private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); - private final Queue queue; + private final BlockingQueue queue; private final ScheduledExecutorService scheduler; @@ -40,21 +52,29 @@ public final class ElasticQueueManager { * Constructs an ElasticQueueManager instance. * * @param elasticsearchProperties the configuration properties for Elasticsearch, including queue parameters + * such as maximum queue size, batch size, and flush delay. + * @param elasticsearchClient the Elasticsearch client used to perform bulk operations. + * @param eventPublisher an event publisher for broadcasting events related to processed operations. */ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, ElasticsearchClient elasticsearchClient, ApplicationEventPublisher eventPublisher) { - queue = new ConcurrentLinkedDeque<>(); atomicDelayer = new AtomicReference<>(); queueProperties = elasticsearchProperties.getQueue(); + queue = new LinkedBlockingDeque<>(queueProperties.getMaxQueueSize()); scheduler = Executors.newScheduledThreadPool(queueProperties.getScheduledExecutorPoolSize()); this.elasticsearchClient = elasticsearchClient; this.eventPublisher = eventPublisher; } /** - * Shuts down the scheduler and flushes any remaining elements in the queue - * to Elasticsearch before the application stops. + * Shuts down the ElasticQueueManager gracefully. + *

+ * This method cancels any pending scheduled tasks, flushes all remaining elements in the queue + * to Elasticsearch, and shuts down the executor service. It ensures that no queued operations + * are lost during the application shutdown process. + * + * @throws InterruptedException if the shutdown is interrupted during termination waiting period. */ @PreDestroy public void shutdown() { @@ -76,17 +96,35 @@ public void shutdown() { } + /** + * Adds a bulk operation to the queue and (re)starts the flush timer if needed. + *

+ * If the queue size is below the configured batch size, the flush operation + * is rescheduled to execute after the appropriate delay interval. + * + * @param operation the bulk operation to be added to the queue. + * @throws IllegalStateException if the queue is full and cannot accept new operations. + */ public void push(BulkOperationWrapper operation) { - queue.add(operation); - if (queue.size() < queueProperties.getMaxQueueSize()) { - resetTimer(); + try { + queue.put(operation); + if (queue.size() < queueProperties.getMaxQueueSize()) { + resetTimer(); + } + } catch (InterruptedException e) { + log.error("Interrupted while pushing to operation queue", e); } } /** - * Synchronously processes and flushes a batch of elements from the queue - * to Elasticsearch. Ensures that no more than the maximum queue size is - * processed at a time. + * Processes and synchronously flushes a batch of elements from the queue to Elasticsearch. + *

+ * The method retrieves up to the configured maximum batch size of operations from the queue + * and sends them to Elasticsearch for indexing. On errors, the batch is re-added to the queue + * for a retry at a later time. Additionally, event publishing is triggered for successfully + * processed operations. + * + * @throws IllegalStateException if triggered in a shutdown state. */ private synchronized void flush() { if (queue.isEmpty()) { @@ -94,7 +132,7 @@ private synchronized void flush() { } List batch = new ArrayList<>(); - while (!queue.isEmpty() && batch.size() < queueProperties.getMaxQueueSize()) { + while (!queue.isEmpty() && batch.size() < queueProperties.getMaxBatchSize()) { batch.add(queue.poll()); } @@ -113,9 +151,12 @@ private synchronized void flush() { } /** - * Resets the timer for executing the flush operation. This ensures - * that the flush operation is delayed until the configured time interval - * has elapsed from the last push to the queue. + * Resets and reschedules the flush timer for delayed execution. + *

+ * This method ensures that the flush operation is rescheduled to occur after + * the configured delay interval. If a timer is already active, it is cancelled + * before restarting a new one. This allows the system to adapt to new operations + * being continuously added to the queue. */ private synchronized void resetTimer() { if (scheduler.isShutdown()) { @@ -129,14 +170,30 @@ private synchronized void resetTimer() { atomicDelayer.set(newTask); } + /** + * Checks the queue size and determines whether to flush or reschedule the timer. + *

+ * If the queue size reaches or exceeds the configured maximum batch size, it + * immediately triggers a flush operation. Otherwise, the timer is reset to wait + * for additional operations. + */ private void checkQueue() { - if (queue.size() >= queueProperties.getMaxQueueSize()) { + if (queue.size() >= queueProperties.getMaxBatchSize()) { flush(); } else { resetTimer(); } } + /** + * Publishes events for all operations in the batch that contain a publishable event. + *

+ * This method iterates through the batch, checking each operation wrapper for an + * associated event. If present, the event is published using the configured + * {@code ApplicationEventPublisher}. + * + * @param batch the list of bulk operation wrappers to process for event publishing. + */ private void publishEventsOfBatch(List batch) { batch.stream().filter(operationWrapper -> operationWrapper.getPublishableEvent() != null) .forEach(operationWrapper -> eventPublisher.publishEvent(operationWrapper.getPublishableEvent())); From ca624840c39806a15b1688f913c1c0998aa4f0e4 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 10:33:36 +0100 Subject: [PATCH 28/37] `Move event publishing to finally block in indexing process` Previously, events were published before ensuring all necessary cleanup steps. By moving the publishEventsOfBatch call to the finally block, we ensure it runs reliably after the index operation, regardless of success or failure. This improves the robustness and consistency of the process. --- .../engine/elastic/service/ElasticQueueManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 7f173a01a3d..3b21db467a1 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -140,13 +140,14 @@ private synchronized void flush() { try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch.stream().map(BulkOperationWrapper::getOperation).toList()).refresh(Refresh.False).build()); - publishEventsOfBatch(batch); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); checkQueue(); } catch (Exception e) { queue.addAll(batch); resetTimer(); log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); + } finally { + publishEventsOfBatch(batch); } } From 959ccb43e9f83a4dc18a29875400e4f0a369cf32 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 10:40:02 +0100 Subject: [PATCH 29/37] Refactor error handling in ElasticQueueManager. Moved `publishEventsOfBatch()` outside the `finally` block to ensure it is not called when an exception occurs. This improves handling of failed indexing batches and prevents unintended event publishing. --- .../engine/elastic/service/ElasticQueueManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 3b21db467a1..fb5d9b3e6fd 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -146,9 +146,9 @@ private synchronized void flush() { queue.addAll(batch); resetTimer(); log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); - } finally { - publishEventsOfBatch(batch); + return; } + publishEventsOfBatch(batch); } /** From b3c41cdf34d25c050f8fb6eade9a6f12c9883651 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 11:43:41 +0100 Subject: [PATCH 30/37] Update ElasticSearch refresh policy handling in bulk requests Replaced usage of `RefreshPolicy` with `Refresh` in `ElasticQueueManager` and aligned the default refresh policy to `Refresh.False`. This ensures consistency and improves clarity in refresh policy configuration for bulk operations. --- .../configuration/properties/DataConfigurationProperties.java | 3 ++- .../engine/elastic/service/ElasticQueueManager.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 9dc3a2d88fa..0cd46da147c 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -1,5 +1,6 @@ package com.netgrif.application.engine.configuration.properties; +import co.elastic.clients.elasticsearch._types.Refresh; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ClusterType; import jakarta.annotation.PostConstruct; @@ -704,7 +705,7 @@ public static class QueueProperties { * Determines when changes made by bulk operations will be visible for search. * Default value is {@link RefreshPolicy#NONE}, meaning no immediate refresh. */ - private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private Refresh refreshPolicy = Refresh.False; } } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index fb5d9b3e6fd..8fe2f6783f2 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -139,7 +139,7 @@ private synchronized void flush() { String uuid = UUID.randomUUID().toString(); try { log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); - elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch.stream().map(BulkOperationWrapper::getOperation).toList()).refresh(Refresh.False).build()); + elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch.stream().map(BulkOperationWrapper::getOperation).toList()).refresh(queueProperties.getRefreshPolicy()).build()); log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); checkQueue(); } catch (Exception e) { From 991164406343a098c0d9710740e7f87720f680bf Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 14:06:59 +0100 Subject: [PATCH 31/37] Fix batch size condition and refine error handling messages Updated the condition to use `getMaxBatchSize` instead of `getMaxQueueSize` to align with batch configuration logic. Enhanced error logging for failed bulk operations to provide clearer context and describe retry mechanisms. --- .../engine/elastic/service/ElasticQueueManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 8fe2f6783f2..732434899c8 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -108,7 +108,7 @@ public void shutdown() { public void push(BulkOperationWrapper operation) { try { queue.put(operation); - if (queue.size() < queueProperties.getMaxQueueSize()) { + if (queue.size() < queueProperties.getMaxBatchSize()) { resetTimer(); } } catch (InterruptedException e) { @@ -143,9 +143,9 @@ private synchronized void flush() { log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); checkQueue(); } catch (Exception e) { - queue.addAll(batch); - resetTimer(); - log.error("Index failed with batch size: {} and id: {}", batch.size(), uuid, e); + log.error("Bulk operation failed for batch id: {} with {} operations. " + + "Operations will be retried via scheduled indexing or manual reindex.", + uuid, batch.size(), e); return; } publishEventsOfBatch(batch); From 38e03098f6cf4df0e0fd0fc412ffed2e260ec73f Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 14:35:12 +0100 Subject: [PATCH 32/37] Remove unused exception documentation from push method The javadoc for the push method incorrectly mentioned an exception that is no longer thrown. This cleanup improves code readability and consistency by removing outdated or misleading information. --- .../application/engine/elastic/service/ElasticQueueManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 732434899c8..afee75913ef 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -103,7 +103,6 @@ public void shutdown() { * is rescheduled to execute after the appropriate delay interval. * * @param operation the bulk operation to be added to the queue. - * @throws IllegalStateException if the queue is full and cannot accept new operations. */ public void push(BulkOperationWrapper operation) { try { From bb7beedd46d2d754cefcd2a01d68f6fbaa882de6 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 14:35:20 +0100 Subject: [PATCH 33/37] Remove unused exception documentation from push method The javadoc for the push method incorrectly mentioned an exception that is no longer thrown. This cleanup improves code readability and consistency by removing outdated or misleading information. --- .../application/engine/elastic/service/ElasticQueueManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index afee75913ef..1184f4f992f 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -1,7 +1,6 @@ package com.netgrif.application.engine.elastic.service; import co.elastic.clients.elasticsearch.ElasticsearchClient; -import co.elastic.clients.elasticsearch._types.Refresh; import co.elastic.clients.elasticsearch.core.BulkRequest; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper; From cb8548c00fbeee261b199db107a42c70a9ec0f4b Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 14:36:52 +0100 Subject: [PATCH 34/37] Fix incorrect JavaDoc comment formatting in ElasticQueueManager The JavaDoc for the `flush` method had an unnecessary annotation about an exception that is not thrown. This update corrects the formatting and removes misleading information about a nonexistent exception. --- .../engine/elastic/service/ElasticQueueManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 1184f4f992f..e7633f071e6 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -121,9 +121,7 @@ public void push(BulkOperationWrapper operation) { * and sends them to Elasticsearch for indexing. On errors, the batch is re-added to the queue * for a retry at a later time. Additionally, event publishing is triggered for successfully * processed operations. - * - * @throws IllegalStateException if triggered in a shutdown state. - */ + **/ private synchronized void flush() { if (queue.isEmpty()) { return; From b4819296d76fab2c8f336cb96e06cfac79a996a7 Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 15:29:47 +0100 Subject: [PATCH 35/37] Fix incorrect JavaDoc comment formatting in ElasticQueueManager The JavaDoc for the `flush` method had an unnecessary annotation about an exception that is not thrown. This update corrects the formatting and removes misleading information about a nonexistent exception. --- .../engine/elastic/service/ElasticQueueManager.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index e7633f071e6..1667e59c860 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -117,11 +117,14 @@ public void push(BulkOperationWrapper operation) { /** * Processes and synchronously flushes a batch of elements from the queue to Elasticsearch. *

- * The method retrieves up to the configured maximum batch size of operations from the queue - * and sends them to Elasticsearch for indexing. On errors, the batch is re-added to the queue - * for a retry at a later time. Additionally, event publishing is triggered for successfully - * processed operations. - **/ + * Retrieves a batch of operations up to the configured maximum size from the queue + * and sends them to Elasticsearch using the bulk API. If a failure occurs during the + * bulk operation, the failed batch remains unprocessed and will be retried later via + * scheduled execution or manual flush invocation. All successfully processed operations + * trigger their respective publishable events. + *

+ * This method is thread-safe and ensures that only one flush operation executes at any time. + */ private synchronized void flush() { if (queue.isEmpty()) { return; From acbec96dcbf05a31b780f464d998b26ca477e0da Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Fri, 7 Nov 2025 15:42:40 +0100 Subject: [PATCH 36/37] Refactor javadoc formatting in ElasticQueueManager. Removed redundant `@throws` tag in the javadoc to improve clarity and adhere to current documentation standards. No functional changes were made in the code. --- .../engine/elastic/service/ElasticQueueManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 1667e59c860..0bce129f6e3 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -72,9 +72,7 @@ public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties e * This method cancels any pending scheduled tasks, flushes all remaining elements in the queue * to Elasticsearch, and shuts down the executor service. It ensures that no queued operations * are lost during the application shutdown process. - * - * @throws InterruptedException if the shutdown is interrupted during termination waiting period. - */ + **/ @PreDestroy public void shutdown() { ScheduledFuture delayer = atomicDelayer.getAndSet(null); From a124e434a767d92b249cdfdb2911dd3dd3e8635b Mon Sep 17 00:00:00 2001 From: renczesstefan Date: Mon, 10 Nov 2025 10:02:28 +0100 Subject: [PATCH 37/37] Enhance error handling for batch operations in ElasticQueueManager Updated exception handling to log failures during batch event publishing. Clarified retry mechanisms in class documentation, specifying scheduled reindexing or manual intervention for failed bulk operations. This improves transparency and resilience in error scenarios. --- .../engine/elastic/service/ElasticQueueManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java index 0bce129f6e3..1465823ca49 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -21,7 +21,7 @@ * - Enqueuing bulk operations and managing a bounded queue to ensure memory efficiency. * - Automatically flushing queued operations into Elasticsearch in configurable batches. * - Controlling execution with a scheduled timer to avoid overloading Elasticsearch. - * - Handling exceptions during the bulk operation, retrying in case of failures. + * - Handling exceptions during bulk operations; failed operations are retried via scheduled reindexing or manual endpoints. *

* Key components of this class: * - {@code queue}: A thread-safe queue to hold bulk operations before processing. @@ -145,7 +145,11 @@ private synchronized void flush() { uuid, batch.size(), e); return; } - publishEventsOfBatch(batch); + try { + publishEventsOfBatch(batch); + } catch (Exception e) { + log.error("Event publishing failed for batch id: {}", uuid, e); + } } /**