Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2f11efc
Release/7.0.0-RC9
machacjozef Oct 23, 2025
b40fe86
[NAE-2250] Optimize Elasticsearch reindexing and bulk operations for …
renczesstefan Nov 4, 2025
9b72195
- corrected according to PR
renczesstefan Nov 4, 2025
d7a57d7
Fix empty batch handling and enhance error recovery
renczesstefan Nov 4, 2025
f30179e
Update log message to indicate case is queued for indexing
renczesstefan Nov 4, 2025
653829a
Improve shutdown logic in ElasticQueueManager
renczesstefan Nov 4, 2025
1946903
Simplify ElasticQueueManager shutdown process
renczesstefan Nov 4, 2025
21f6549
Refactor shutdown logic in ElasticQueueManager
renczesstefan Nov 4, 2025
b0da37d
Reduce default thread pool size for scheduled executor
renczesstefan Nov 4, 2025
4fa3988
Add delay configuration to Elasticsearch queue in tests
renczesstefan Nov 5, 2025
7a1fb84
Add validation annotations and RefreshPolicy to queue config
renczesstefan Nov 5, 2025
625fee3
Refactor Elasticsearch integration to use bulk operations
renczesstefan Nov 6, 2025
6c21409
Refactor ElasticQueueManager and clean up unused dependencies.
renczesstefan Nov 6, 2025
4f74195
Optimize queue handling in ElasticQueueManager.
renczesstefan Nov 6, 2025
026a96e
Add Serializable and _class handling to ElasticTask
renczesstefan Nov 6, 2025
da91804
Refactor Elastic domain models and improve queue management.
renczesstefan Nov 6, 2025
8d584a2
Remove generic type parameter from ElasticQueueManager
renczesstefan Nov 6, 2025
3cd8144
Remove unused ElasticsearchConverter import
renczesstefan Nov 6, 2025
8bab0c2
Refactor case deletion logic in CaseEventHandler
renczesstefan Nov 6, 2025
20b69ba
Remove unused removeByPetriNetId method from ElasticCaseService
renczesstefan Nov 6, 2025
d01bb7e
Refactor document mapping in ElasticIndexService.
renczesstefan Nov 6, 2025
64b749f
Refactor ElasticQueueManager and clean up ElasticTask code.
renczesstefan Nov 6, 2025
838a277
Remove unused variable in ElasticQueueManager
renczesstefan Nov 6, 2025
83b7b19
Fix timer reset on index failure in ElasticQueueManager
renczesstefan Nov 6, 2025
f702a3c
Refactor ElasticQueueManager to include event publishing
renczesstefan Nov 6, 2025
e1fd1f2
Synchronize resetTimer to ensure thread safety.
renczesstefan Nov 7, 2025
1e983f0
Refactor ElasticQueueManager for improved queue handling
renczesstefan Nov 7, 2025
ca62484
`Move event publishing to finally block in indexing process`
renczesstefan Nov 7, 2025
959ccb4
Refactor error handling in ElasticQueueManager.
renczesstefan Nov 7, 2025
b3c41cd
Update ElasticSearch refresh policy handling in bulk requests
renczesstefan Nov 7, 2025
9911644
Fix batch size condition and refine error handling messages
renczesstefan Nov 7, 2025
38e0309
Remove unused exception documentation from push method
renczesstefan Nov 7, 2025
bb7beed
Remove unused exception documentation from push method
renczesstefan Nov 7, 2025
cb8548c
Fix incorrect JavaDoc comment formatting in ElasticQueueManager
renczesstefan Nov 7, 2025
b481929
Fix incorrect JavaDoc comment formatting in ElasticQueueManager
renczesstefan Nov 7, 2025
acbec96
Refactor javadoc formatting in ElasticQueueManager.
renczesstefan Nov 7, 2025
d3a3ecf
Merge remote-tracking branch 'origin/release/7.0.0-rev8' into NAE-2250
machacjozef Nov 10, 2025
a124e43
Enhance error handling for batch operations in ElasticQueueManager
renczesstefan Nov 10, 2025
140d2c2
Merge remote-tracking branch 'origin/NAE-2250' into NAE-2250
renczesstefan Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
package com.netgrif.application.engine.configuration;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties;
import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository;
import com.netgrif.application.engine.elastic.domain.ElasticTaskRepository;
import com.netgrif.application.engine.elastic.service.ElasticCaseService;
import com.netgrif.application.engine.elastic.service.ElasticTaskService;
import com.netgrif.application.engine.elastic.service.executors.Executor;
import com.netgrif.application.engine.elastic.service.interfaces.IElasticCasePrioritySearch;
import com.netgrif.application.engine.elastic.service.interfaces.IElasticCaseService;
import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import com.netgrif.application.engine.petrinet.service.interfaces.IPetriNetService;
import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate;

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(
value = "netgrif.engine.data.elasticsearch.service.configuration-enabled",
matchIfMissing = true,
havingValue = "true"
)
public class ElasticServiceConfiguration {

@Autowired
private ElasticCaseRepository caseRepository;

@Autowired
private ElasticTaskRepository taskRepository;

@Autowired
private ElasticsearchTemplate elasticsearchTemplate;

@Autowired
private DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties;
private final ElasticCaseRepository caseRepository;
private final ElasticsearchTemplate elasticsearchTemplate;
private final DataConfigurationProperties.ElasticsearchProperties elasticsearchProperties;
private final IPetriNetService petriNetService;
private final IWorkflowService workflowService;
private final IElasticCasePrioritySearch elasticCasePrioritySearch;
private final ApplicationEventPublisher applicationEventPublisher;
private final ElasticsearchClient elasticsearchClient;

@Bean
@Primary
Expand All @@ -54,7 +57,17 @@ public Executor reindexingTaskTaskExecutor() {
@Bean
@Primary
public IElasticCaseService elasticCaseService() {
return new ElasticCaseService(caseRepository, elasticsearchTemplate, executor());
return new ElasticCaseService(
caseRepository,
elasticsearchTemplate,
executor(),
elasticsearchProperties,
petriNetService,
workflowService,
elasticCasePrioritySearch,
applicationEventPublisher,
elasticsearchClient
);
}

@Bean
Expand All @@ -65,7 +78,17 @@ public IElasticTaskService elasticTaskService() {

@Bean
public IElasticCaseService reindexingTaskElasticCaseService() {
return new ElasticCaseService(caseRepository, elasticsearchTemplate, reindexingTaskCaseExecutor());
return new ElasticCaseService(
caseRepository,
elasticsearchTemplate,
reindexingTaskCaseExecutor(),
elasticsearchProperties,
petriNetService,
workflowService,
elasticCasePrioritySearch,
applicationEventPublisher,
elasticsearchClient
);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netgrif.application.engine.configuration.properties;

import co.elastic.clients.elasticsearch._types.Refresh;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterType;
import jakarta.annotation.PostConstruct;
Expand All @@ -15,6 +16,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

Expand Down Expand Up @@ -521,6 +523,16 @@ public static class ElasticsearchProperties {
*/
@Valid
private BatchProperties batch = new BatchProperties();


/**
* Configuration properties for handling queues in Elasticsearch operations.
* These properties specify the behavior of the ElasticQueueManager,
* including the maximum queue size, delay between flush operations,
* and the thread pool size for scheduled executor service tasks.
*/
@Valid
private QueueProperties queue = new QueueProperties();
Comment thread
renczesstefan marked this conversation as resolved.

public static final String PETRI_NET_INDEX = "petriNet";

Expand Down Expand Up @@ -638,6 +650,63 @@ public static class BatchProperties {
@Min(1)
private int taskBatchSize = 20000;
}


/**
* Configuration properties for handling queues in Elasticsearch operations.
* These properties specify the behavior of the ElasticQueueManager,
* including the maximum queue size, delay between flush operations,
* and the thread pool size for scheduled executor service tasks.
*/
@Data
public static class QueueProperties {

/**
* The size of the thread pool for the scheduled executor service.
* This determines the number of threads available to schedule and execute tasks.
* Default value: 10.
*/
@Min(1)
private int scheduledExecutorPoolSize = 10;

/**
* Delay time between flush operations in the queue.
* This value represents the amount of time the scheduler waits before initiating the next flush.
* Default value: 150.
*/
@Min(1)
private int delay = 150;

/**
* The time unit of the delay for flush operations.
* Default value: {@link TimeUnit#MILLISECONDS}.
*/
private TimeUnit delayUnit = TimeUnit.MILLISECONDS;

/**
* Maximum number of elements allowed in batch to flush.
* When the batch size reaches this limit, it triggers a flush operation.
* Default value: 400.
*/
@Min(1)
private int maxBatchSize = 400;


/**
* Specifies the maximum size of the queue. When the queue reaches this size,
* a flush operation is triggered to process the elements in the queue.
* Default value is 3000, and the minimum allowable value is 400.
*/
@Min(400)
private int maxQueueSize = 3000;

/**
* Defines the refresh policy for Elasticsearch operations.
* Determines when changes made by bulk operations will be visible for search.
* Default value is {@link RefreshPolicy#NONE}, meaning no immediate refresh.
*/
private Refresh refreshPolicy = Refresh.False;
}
Comment thread
renczesstefan marked this conversation as resolved.
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netgrif.application.engine.elastic.domain;


import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.EventObject;

@Data
@Builder
@AllArgsConstructor
public class BulkOperationWrapper {

private BulkOperation operation;

private EventObject publishableEvent;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.netgrif.application.engine.elastic.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.TermsQueryField;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties;
import com.netgrif.application.engine.elastic.domain.BulkOperationWrapper;
import com.netgrif.application.engine.objects.auth.domain.LoggedUser;
import com.netgrif.application.engine.objects.elastic.domain.ElasticCase;
import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository;
Expand All @@ -22,21 +25,19 @@
import com.netgrif.application.engine.utils.FullPageRequest;
import com.netgrif.application.engine.objects.workflow.domain.Case;
import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService;
import lombok.RequiredArgsConstructor;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.*;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.NativeQueryBuilder;
import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Order;
import org.springframework.data.elasticsearch.core.query.*;
Comment thread
renczesstefan marked this conversation as resolved.
import org.springframework.stereotype.Service;

import java.util.*;
Expand All @@ -48,7 +49,6 @@
import static org.springframework.data.elasticsearch.client.elc.Queries.termQuery;

@Service
@RequiredArgsConstructor
public class ElasticCaseService extends ElasticViewPermissionService implements IElasticCaseService {

private static final Logger log = LoggerFactory.getLogger(ElasticCaseService.class);
Expand All @@ -60,69 +60,65 @@ public class ElasticCaseService extends ElasticViewPermissionService implements
protected IPetriNetService petriNetService;
protected IWorkflowService workflowService;
protected IElasticCasePrioritySearch iElasticCasePrioritySearch;

@Autowired
@Lazy
public void setWorkflowService(IWorkflowService workflowService) {
this.workflowService = workflowService;
}

@Autowired
@Lazy
public void setPetriNetService(IPetriNetService petriNetService) {
this.petriNetService = petriNetService;
}

@Autowired
protected ApplicationEventPublisher publisher;

@Autowired
public void setElasticCasePrioritySearch(IElasticCasePrioritySearch iElasticCasePrioritySearch) {
protected ElasticQueueManager caseElasticIndexQueueManager;
protected ElasticQueueManager caseElasticDeleteQueueManager;

public ElasticCaseService(ElasticCaseRepository repository,
ElasticsearchTemplate template,
Executor executors,
DataConfigurationProperties.ElasticsearchProperties elasticProperties,
@Lazy IPetriNetService petriNetService,
@Lazy IWorkflowService workflowService,
IElasticCasePrioritySearch iElasticCasePrioritySearch,
ApplicationEventPublisher publisher,
ElasticsearchClient elasticsearchClient) {
this.repository = repository;
this.template = template;
this.executors = executors;
this.elasticProperties = elasticProperties;
this.petriNetService = petriNetService;
this.workflowService = workflowService;
this.iElasticCasePrioritySearch = iElasticCasePrioritySearch;
}
this.publisher = publisher;
this.caseElasticIndexQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher);
this.caseElasticDeleteQueueManager = new ElasticQueueManager(elasticProperties, elasticsearchClient, publisher);

@Autowired
public void setElasticProperties(DataConfigurationProperties.ElasticsearchProperties elasticProperties) {
this.elasticProperties = elasticProperties;
}

@Override
public void remove(String caseId) {
executors.execute(caseId, () -> {
repository.deleteAllById(caseId);
log.info("[" + caseId + "]: Case \"" + caseId + "\" deleted");
});
@PreDestroy
private void stopQueues() {
caseElasticIndexQueueManager.shutdown();
caseElasticDeleteQueueManager.shutdown();
log.info("Queues for cases have been stopped");
}

@Override
public void removeByPetriNetId(String processId) {
executors.execute(processId, () -> {
repository.deleteAllByProcessId(processId);
log.info("[" + processId + "]: All cases of Petri Net with id \"" + processId + "\" deleted");
});
public void remove(String caseId) {
caseElasticDeleteQueueManager.push(new BulkOperationWrapper(
BulkOperation.of(op -> op.delete(d -> d.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)).id(caseId))),
null
));
log.info("[{}]: Case \"{}\" queued for deletion", caseId, caseId);
}

@Override
public void index(ElasticCase useCase) {
executors.execute(useCase.getId(), () -> {
try {
Optional<com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase> elasticCaseOptional = repository.findById(useCase.getId());
if (elasticCaseOptional.isEmpty()) {
repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase);
} else {
com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get();
elasticCase.update(useCase);
repository.save(elasticCase);
}
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
publisher.publishEvent(new IndexCaseEvent(useCase));
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed");
repository.deleteAllById(useCase.getId());
repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase);
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
}
});
Optional<com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase> elasticCaseOptional = repository.findById(useCase.getId());
if (elasticCaseOptional.isEmpty()) {
caseElasticIndexQueueManager.push(BulkOperationWrapper.builder()
.operation(createIndexOperation(useCase))
.publishableEvent(new IndexCaseEvent(useCase))
.build());
} else {
com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get();
elasticCase.update(useCase);
caseElasticIndexQueueManager.push(BulkOperationWrapper.builder()
.operation(createIndexOperation(elasticCase))
.publishableEvent(new IndexCaseEvent(elasticCase))
.build());
}
log.debug("[{}]: Case \"{}\" queued for indexing", useCase.getId(), useCase.getTitle());
}

@Override
Expand Down Expand Up @@ -522,4 +518,11 @@ protected Pageable resolveUnmappedSortAttributes(Pageable pageable) {
pageable.getSort().iterator().forEachRemaining(order -> modifiedOrders.add(new Order(order.getDirection(), order.getProperty()).withUnmappedType("keyword")));
return PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()).withSort(Sort.by(modifiedOrders));
}

private BulkOperation createIndexOperation(ElasticCase useCase) {
return BulkOperation.of(op -> op.index(i -> i
.index(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX))
.id(useCase.getId())
.document(template.getElasticsearchConverter().mapObject(useCase))));
}
}
Loading
Loading