diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java index bcc457e059d..dab7c287a52 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/ElasticServiceConfiguration.java @@ -1,21 +1,27 @@ package com.netgrif.application.engine.configuration; +import co.elastic.clients.elasticsearch.ElasticsearchClient; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; import com.netgrif.application.engine.elastic.domain.ElasticTaskRepository; import com.netgrif.application.engine.elastic.service.ElasticCaseService; import com.netgrif.application.engine.elastic.service.ElasticTaskService; import com.netgrif.application.engine.elastic.service.executors.Executor; +import com.netgrif.application.engine.elastic.service.interfaces.IElasticCasePrioritySearch; import com.netgrif.application.engine.elastic.service.interfaces.IElasticCaseService; import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskService; -import org.springframework.beans.factory.annotation.Autowired; +import com.netgrif.application.engine.petrinet.service.interfaces.IPetriNetService; +import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; +import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; @Configuration +@RequiredArgsConstructor @ConditionalOnProperty( value = "netgrif.engine.data.elasticsearch.service.configuration-enabled", matchIfMissing = true, @@ -23,17 +29,14 @@ ) public class ElasticServiceConfiguration { - @Autowired - private ElasticCaseRepository caseRepository; - - @Autowired - private ElasticTaskRepository taskRepository; - - @Autowired - private ElasticsearchTemplate elasticsearchTemplate; - - @Autowired - private DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties; + private final ElasticCaseRepository caseRepository; + private final ElasticsearchTemplate elasticsearchTemplate; + private final DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties; + private final IPetriNetService petriNetService; + private final IWorkflowService workflowService; + private final IElasticCasePrioritySearch elasticCasePrioritySearch; + private final ApplicationEventPublisher applicationEventPublisher; + private final ElasticsearchClient elasticsearchClient; @Bean @Primary @@ -54,7 +57,17 @@ public Executor reindexingTaskTaskExecutor() { @Bean @Primary public IElasticCaseService elasticCaseService() { - return new ElasticCaseService(caseRepository, elasticsearchTemplate, executor()); + return new ElasticCaseService( + caseRepository, + elasticsearchTemplate, + executor(), + elasticsearchProperties, + petriNetService, + workflowService, + elasticCasePrioritySearch, + applicationEventPublisher, + elasticsearchClient + ); } @Bean @@ -65,7 +78,17 @@ public IElasticTaskService elasticTaskService() { @Bean public IElasticCaseService reindexingTaskElasticCaseService() { - return new ElasticCaseService(caseRepository, elasticsearchTemplate, reindexingTaskCaseExecutor()); + return new ElasticCaseService( + caseRepository, + elasticsearchTemplate, + reindexingTaskCaseExecutor(), + elasticsearchProperties, + petriNetService, + workflowService, + elasticCasePrioritySearch, + applicationEventPublisher, + elasticsearchClient + ); } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java index 996318b03c6..0cd46da147c 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/configuration/properties/DataConfigurationProperties.java @@ -1,5 +1,6 @@ package com.netgrif.application.engine.configuration.properties; +import co.elastic.clients.elasticsearch._types.Refresh; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ClusterType; import jakarta.annotation.PostConstruct; @@ -15,6 +16,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.Resource; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -521,6 +523,16 @@ public static class ElasticsearchProperties { */ @Valid private BatchProperties batch = new BatchProperties(); + + + /** + * Configuration properties for handling queues in Elasticsearch operations. + * These properties specify the behavior of the ElasticQueueManager, + * including the maximum queue size, delay between flush operations, + * and the thread pool size for scheduled executor service tasks. + */ + @Valid + private QueueProperties queue = new QueueProperties(); public static final String PETRI_NET_INDEX = "petriNet"; @@ -638,6 +650,63 @@ public static class BatchProperties { @Min(1) private int taskBatchSize = 20000; } + + + /** + * Configuration properties for handling queues in Elasticsearch operations. + * These properties specify the behavior of the ElasticQueueManager, + * including the maximum queue size, delay between flush operations, + * and the thread pool size for scheduled executor service tasks. + */ + @Data + public static class QueueProperties { + + /** + * The size of the thread pool for the scheduled executor service. + * This determines the number of threads available to schedule and execute tasks. + * Default value: 10. + */ + @Min(1) + private int scheduledExecutorPoolSize = 10; + + /** + * Delay time between flush operations in the queue. + * This value represents the amount of time the scheduler waits before initiating the next flush. + * Default value: 150. + */ + @Min(1) + private int delay = 150; + + /** + * The time unit of the delay for flush operations. + * Default value: {@link TimeUnit#MILLISECONDS}. + */ + private TimeUnit delayUnit = TimeUnit.MILLISECONDS; + + /** + * Maximum number of elements allowed in batch to flush. + * When the batch size reaches this limit, it triggers a flush operation. + * Default value: 400. + */ + @Min(1) + private int maxBatchSize = 400; + + + /** + * Specifies the maximum size of the queue. When the queue reaches this size, + * a flush operation is triggered to process the elements in the queue. + * Default value is 3000, and the minimum allowable value is 400. + */ + @Min(400) + private int maxQueueSize = 3000; + + /** + * Defines the refresh policy for Elasticsearch operations. + * Determines when changes made by bulk operations will be visible for search. + * Default value is {@link RefreshPolicy#NONE}, meaning no immediate refresh. + */ + private Refresh refreshPolicy = Refresh.False; + } } /** diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java new file mode 100644 index 00000000000..1cce05d15e0 --- /dev/null +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/domain/BulkOperationWrapper.java @@ -0,0 +1,19 @@ +package com.netgrif.application.engine.elastic.domain; + + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.EventObject; + +@Data +@Builder +@AllArgsConstructor +public class BulkOperationWrapper { + + private BulkOperation operation; + + private EventObject publishableEvent; +} diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java index 07d69011897..f7aba77b5f7 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseService.java @@ -1,12 +1,15 @@ package com.netgrif.application.engine.elastic.service; +import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.FieldValue; import co.elastic.clients.elasticsearch._types.mapping.FieldType; import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery; import co.elastic.clients.elasticsearch._types.query_dsl.TermsQueryField; import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; +import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper; import com.netgrif.application.engine.objects.auth.domain.LoggedUser; import com.netgrif.application.engine.objects.elastic.domain.ElasticCase; import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; @@ -22,13 +25,11 @@ import com.netgrif.application.engine.utils.FullPageRequest; import com.netgrif.application.engine.objects.workflow.domain.Case; import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; -import lombok.RequiredArgsConstructor; +import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; -import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.*; import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate; import org.springframework.data.elasticsearch.client.elc.NativeQuery; @@ -36,7 +37,7 @@ import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.Order; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.stereotype.Service; import java.util.*; @@ -48,7 +49,6 @@ import static org.springframework.data.elasticsearch.client.elc.Queries.termQuery; @Service -@RequiredArgsConstructor public class ElasticCaseService extends ElasticViewPermissionService implements IElasticCaseService { private static final Logger log = LoggerFactory.getLogger(ElasticCaseService.class); @@ -60,69 +60,65 @@ public class ElasticCaseService extends ElasticViewPermissionService implements protected IPetriNetService petriNetService; protected IWorkflowService workflowService; protected IElasticCasePrioritySearch iElasticCasePrioritySearch; - - @Autowired - @Lazy - public void setWorkflowService(IWorkflowService workflowService) { - this.workflowService = workflowService; - } - - @Autowired - @Lazy - public void setPetriNetService(IPetriNetService petriNetService) { - this.petriNetService = petriNetService; - } - - @Autowired protected ApplicationEventPublisher publisher; - - @Autowired - public void setElasticCasePrioritySearch(IElasticCasePrioritySearch iElasticCasePrioritySearch) { + protected ElasticQueueManager caseElasticIndexQueueManager; + protected ElasticQueueManager caseElasticDeleteQueueManager; + + public ElasticCaseService(ElasticCaseRepository repository, + ElasticsearchTemplate template, + Executor executors, + DataConfigurationProperties.ElasticsearchProperties elasticProperties, + @Lazy IPetriNetService petriNetService, + @Lazy IWorkflowService workflowService, + IElasticCasePrioritySearch iElasticCasePrioritySearch, + ApplicationEventPublisher publisher, + ElasticsearchClient elasticsearchClient) { + this.repository = repository; + this.template = template; + this.executors = executors; + this.elasticProperties = elasticProperties; + this.petriNetService = petriNetService; + this.workflowService = workflowService; this.iElasticCasePrioritySearch = iElasticCasePrioritySearch; - } + this.publisher = publisher; + this.caseElasticIndexQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher); + this.caseElasticDeleteQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher); - @Autowired - public void setElasticProperties(DataConfigurationProperties.ElasticsearchProperties elasticProperties) { - this.elasticProperties = elasticProperties; } - @Override - public void remove(String caseId) { - executors.execute(caseId, () -> { - repository.deleteAllById(caseId); - log.info("[" + caseId + "]: Case \"" + caseId + "\" deleted"); - }); + @PreDestroy + private void stopQueues() { + caseElasticIndexQueueManager.shutdown(); + caseElasticDeleteQueueManager.shutdown(); + log.info("Queues for cases have been stopped"); } @Override - public void removeByPetriNetId(String processId) { - executors.execute(processId, () -> { - repository.deleteAllByProcessId(processId); - log.info("[" + processId + "]: All cases of Petri Net with id \"" + processId + "\" deleted"); - }); + public void remove(String caseId) { + caseElasticDeleteQueueManager.push(new BulkOperationWrapper( + BulkOperation.of(op -> op.delete(d -> d.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)).id(caseId))), + null + )); + log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId); } @Override public void index(ElasticCase useCase) { - executors.execute(useCase.getId(), () -> { - try { - Optional elasticCaseOptional = repository.findById(useCase.getId()); - if (elasticCaseOptional.isEmpty()) { - repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); - } else { - com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); - elasticCase.update(useCase); - repository.save(elasticCase); - } - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); - publisher.publishEvent(new IndexCaseEvent(useCase)); - } catch (InvalidDataAccessApiUsageException ignored) { - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed"); - repository.deleteAllById(useCase.getId()); - repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase); - log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed"); - } - }); + Optional elasticCaseOptional = repository.findById(useCase.getId()); + if (elasticCaseOptional.isEmpty()) { + caseElasticIndexQueueManager.push(BulkOperationWrapper.builder() + .operation(createIndexOperation(useCase)) + .publishableEvent(new IndexCaseEvent(useCase)) + .build()); + } else { + com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get(); + elasticCase.update(useCase); + caseElasticIndexQueueManager.push(BulkOperationWrapper.builder() + .operation(createIndexOperation(elasticCase)) + .publishableEvent(new IndexCaseEvent(elasticCase)) + .build()); + } + log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle()); } @Override @@ -522,4 +518,11 @@ protected Pageable resolveUnmappedSortAttributes(Pageable pageable) { pageable.getSort().iterator().forEachRemaining(order -> modifiedOrders.add(new Order(order.getDirection(), order.getProperty()).withUnmappedType("keyword"))); return PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()).withSort(Sort.by(modifiedOrders)); } + + private BulkOperation createIndexOperation(ElasticCase useCase) { + return BulkOperation.of(op -> op.index(i -> i + .index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)) + .id(useCase.getId()) + .document(template.getElasticsearchConverter().mapObject(useCase)))); + } } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java index 05c4fbc53ef..3cb4d15a1c6 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java @@ -452,7 +452,7 @@ private void prepareCaseBulkOperation(ElasticCase doc, List opera .index(elasticsearchProperties.getIndex().get("case")) .id(doc.getId()) .action(a -> a - .doc(doc) + .doc(elasticsearchTemplate.getElasticsearchConverter().mapObject(doc)) .docAsUpsert(true) ) ))); @@ -474,7 +474,7 @@ private void prepareTaskBulkOperation(ElasticTask doc, List opera .index(elasticsearchProperties.getIndex().get("task")) .id(doc.getId()) .action(a -> a - .doc(doc) + .doc(elasticsearchTemplate.getElasticsearchConverter().mapObject(doc)) .docAsUpsert(true) ) )) diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java new file mode 100644 index 00000000000..1465823ca49 --- /dev/null +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/ElasticQueueManager.java @@ -0,0 +1,203 @@ +package com.netgrif.application.engine.elastic.service; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties; +import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper; +import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * Manages a queue of elastic queries and handles their periodic bulk processing in Elasticsearch. + *

+ * This class is responsible for: + * - Enqueuing bulk operations and managing a bounded queue to ensure memory efficiency. + * - Automatically flushing queued operations into Elasticsearch in configurable batches. + * - Controlling execution with a scheduled timer to avoid overloading Elasticsearch. + * - Handling exceptions during bulk operations; failed operations are retried via scheduled reindexing or manual endpoints. + *

+ * Key components of this class: + * - {@code queue}: A thread-safe queue to hold bulk operations before processing. + * - {@code scheduler}: A scheduled executor service that manages timed execution of flush operations. + * - {@code atomicDelayer}: An atomic reference ensuring only one active scheduled task for the flush timer. + * - {@code queueProperties}: Configuration controlling maximum queue size, batch size, and flush delay parameters. + * - {@code elasticsearchClient}: The Elasticsearch client used for direct communication with the cluster. + * - {@code eventPublisher}: Publishes relevant events generated during bulk operation processing. + **/ +public final class ElasticQueueManager { + + private final Logger log = LoggerFactory.getLogger(ElasticQueueManager.class); + + private final BlockingQueue queue; + + private final ScheduledExecutorService scheduler; + + private final AtomicReference> atomicDelayer; + + private final DataConfigurationProperties.ElasticsearchProperties.QueueProperties queueProperties; + + private final ElasticsearchClient elasticsearchClient; + + private final ApplicationEventPublisher eventPublisher; + + /** + * Constructs an ElasticQueueManager instance. + * + * @param elasticsearchProperties the configuration properties for Elasticsearch, including queue parameters + * such as maximum queue size, batch size, and flush delay. + * @param elasticsearchClient the Elasticsearch client used to perform bulk operations. + * @param eventPublisher an event publisher for broadcasting events related to processed operations. + */ + public ElasticQueueManager(DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties, + ElasticsearchClient elasticsearchClient, + ApplicationEventPublisher eventPublisher) { + atomicDelayer = new AtomicReference<>(); + queueProperties = elasticsearchProperties.getQueue(); + queue = new LinkedBlockingDeque<>(queueProperties.getMaxQueueSize()); + scheduler = Executors.newScheduledThreadPool(queueProperties.getScheduledExecutorPoolSize()); + this.elasticsearchClient = elasticsearchClient; + this.eventPublisher = eventPublisher; + } + + /** + * Shuts down the ElasticQueueManager gracefully. + *

+ * This method cancels any pending scheduled tasks, flushes all remaining elements in the queue + * to Elasticsearch, and shuts down the executor service. It ensures that no queued operations + * are lost during the application shutdown process. + **/ + @PreDestroy + public void shutdown() { + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } + scheduler.shutdown(); + while (!queue.isEmpty()) { + flush(); + } + try { + if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + } + + + /** + * Adds a bulk operation to the queue and (re)starts the flush timer if needed. + *

+ * If the queue size is below the configured batch size, the flush operation + * is rescheduled to execute after the appropriate delay interval. + * + * @param operation the bulk operation to be added to the queue. + */ + public void push(BulkOperationWrapper operation) { + try { + queue.put(operation); + if (queue.size() < queueProperties.getMaxBatchSize()) { + resetTimer(); + } + } catch (InterruptedException e) { + log.error("Interrupted while pushing to operation queue", e); + } + } + + /** + * Processes and synchronously flushes a batch of elements from the queue to Elasticsearch. + *

+ * Retrieves a batch of operations up to the configured maximum size from the queue + * and sends them to Elasticsearch using the bulk API. If a failure occurs during the + * bulk operation, the failed batch remains unprocessed and will be retried later via + * scheduled execution or manual flush invocation. All successfully processed operations + * trigger their respective publishable events. + *

+ * This method is thread-safe and ensures that only one flush operation executes at any time. + */ + private synchronized void flush() { + if (queue.isEmpty()) { + return; + } + + List batch = new ArrayList<>(); + while (!queue.isEmpty() && batch.size() < queueProperties.getMaxBatchSize()) { + batch.add(queue.poll()); + } + + String uuid = UUID.randomUUID().toString(); + try { + log.debug("Index started with batch size: {} and id: {}", batch.size(), uuid); + elasticsearchClient.bulk(new BulkRequest.Builder().operations(batch.stream().map(BulkOperationWrapper::getOperation).toList()).refresh(queueProperties.getRefreshPolicy()).build()); + log.debug("Index finished with batch size: {} and id: {}", batch.size(), uuid); + checkQueue(); + } catch (Exception e) { + log.error("Bulk operation failed for batch id: {} with {} operations. " + + "Operations will be retried via scheduled indexing or manual reindex.", + uuid, batch.size(), e); + return; + } + try { + publishEventsOfBatch(batch); + } catch (Exception e) { + log.error("Event publishing failed for batch id: {}", uuid, e); + } + } + + /** + * Resets and reschedules the flush timer for delayed execution. + *

+ * This method ensures that the flush operation is rescheduled to occur after + * the configured delay interval. If a timer is already active, it is cancelled + * before restarting a new one. This allows the system to adapt to new operations + * being continuously added to the queue. + */ + private synchronized void resetTimer() { + if (scheduler.isShutdown()) { + return; + } + ScheduledFuture delayer = atomicDelayer.getAndSet(null); + if (delayer != null) { + delayer.cancel(false); + } + ScheduledFuture newTask = scheduler.schedule(this::flush, queueProperties.getDelay(), queueProperties.getDelayUnit()); + atomicDelayer.set(newTask); + } + + /** + * Checks the queue size and determines whether to flush or reschedule the timer. + *

+ * If the queue size reaches or exceeds the configured maximum batch size, it + * immediately triggers a flush operation. Otherwise, the timer is reset to wait + * for additional operations. + */ + private void checkQueue() { + if (queue.size() >= queueProperties.getMaxBatchSize()) { + flush(); + } else { + resetTimer(); + } + } + + /** + * Publishes events for all operations in the batch that contain a publishable event. + *

+ * This method iterates through the batch, checking each operation wrapper for an + * associated event. If present, the event is published using the configured + * {@code ApplicationEventPublisher}. + * + * @param batch the list of bulk operation wrappers to process for event publishing. + */ + private void publishEventsOfBatch(List batch) { + batch.stream().filter(operationWrapper -> operationWrapper.getPublishableEvent() != null) + .forEach(operationWrapper -> eventPublisher.publishEvent(operationWrapper.getPublishableEvent())); + } +} diff --git a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java index 8750dff1391..bf3da6f70ba 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticCaseService.java @@ -23,6 +23,4 @@ public interface IElasticCaseService { long count(List requests, LoggedUser user, Locale locale, Boolean isIntersection); void remove(String caseId); - - void removeByPetriNetId(String processId); } diff --git a/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java b/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java index 44e39a0a236..70505e4c9a2 100644 --- a/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java +++ b/application-engine/src/main/java/com/netgrif/application/engine/workflow/service/CaseEventHandler.java @@ -58,17 +58,6 @@ public void onAfterDelete(AfterDeleteEvent event) { } String objectId = ((Document)document.get("_id")).get("shortProcessId") + "-" + ((Document)document.get("_id")).get("objectId").toString(); - if (objectId != null) { - service.remove(objectId); - return; - } - - objectId = document.getObjectId("petriNetObjectId").toString(); - if (objectId != null) { - service.removeByPetriNetId(objectId); - return; - } - - throw new IllegalStateException("Case has been deleted neither by ID nor by process ID!"); + service.remove(objectId); } } diff --git a/application-engine/src/test/resources/application-test.yaml b/application-engine/src/test/resources/application-test.yaml index 78636b7143e..57e200d3ec4 100644 --- a/application-engine/src/test/resources/application-test.yaml +++ b/application-engine/src/test/resources/application-test.yaml @@ -7,6 +7,8 @@ netgrif: mongodb: drop: true elasticsearch: + queue: + delay: 10 drop: true index: petriNet: ${NETGRIF_ENGINE_DATA_DATABASE_NAME:nae}_test_petrinet diff --git a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java index 6b36ce07c48..db7babc19e7 100644 --- a/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java +++ b/nae-object-library/src/main/java/com/netgrif/application/engine/objects/elastic/domain/ElasticTask.java @@ -8,6 +8,8 @@ import com.netgrif.application.engine.objects.workflow.domain.Task; import lombok.*; +import java.io.Serial; +import java.io.Serializable; import java.time.LocalDateTime; import java.util.HashMap; import java.util.HashSet; @@ -17,7 +19,11 @@ @Data @NoArgsConstructor @AllArgsConstructor -public abstract class ElasticTask { +public abstract class ElasticTask implements Serializable { + + @Serial + private static final long serialVersionUID = 8399390623172906801L; + private String id; diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java index 661cd3868ca..cfb36a8446d 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticCase.java @@ -13,7 +13,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.*; @NoArgsConstructor -@Document(indexName = "#{@elasticCaseIndex}") +@Document(indexName = "#{@elasticCaseIndex}", createIndex = false) public class ElasticCase extends com.netgrif.application.engine.objects.elastic.domain.ElasticCase { public ElasticCase(Case useCase) { diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java index 5794c8a52dc..33481270767 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticPetriNet.java @@ -17,7 +17,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.Keyword; @NoArgsConstructor -@Document(indexName = "#{@elasticPetriNetIndex}") +@Document(indexName = "#{@elasticPetriNetIndex}", createIndex = false) public class ElasticPetriNet extends com.netgrif.application.engine.objects.elastic.domain.ElasticPetriNet { public ElasticPetriNet(PetriNet net) { diff --git a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java index cb988a814d5..3752616217e 100644 --- a/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java +++ b/nae-spring-core-adapter/src/main/java/com/netgrif/application/engine/adapter/spring/elastic/domain/ElasticTask.java @@ -16,7 +16,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.Keyword; @NoArgsConstructor -@Document(indexName = "#{@elasticTaskIndex}") +@Document(indexName = "#{@elasticTaskIndex}", createIndex = false) public class ElasticTask extends com.netgrif.application.engine.objects.elastic.domain.ElasticTask { public ElasticTask(Task task) {