diff --git a/pom.xml b/pom.xml index 1ff7da9f49f..06357d40181 100644 --- a/pom.xml +++ b/pom.xml @@ -64,9 +64,10 @@ 7.70.0.Final netgrif-oss https://sonarcloud.io + 2.15.0-rc1 - + @@ -90,7 +91,17 @@ - + + jitpack.io + https://jitpack.io + + true + + + false + + + @@ -352,12 +363,11 @@ ${querydsl.version} - - com.github.kenglxn.qrgen - javase - 2.6.0 + com.github.kenglxn + QRGen + 3.0.1 @@ -366,6 +376,20 @@ spring-boot-starter-data-elasticsearch + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + + org.glassfish + jakarta.json + 2.0.1 + + + com.google.protobuf @@ -395,13 +419,6 @@ ${drools.version} - - - net.glxn.qrgen - core - 2.0 - - org.apache.commons commons-lang3 @@ -492,38 +509,38 @@ com.fasterxml.jackson jackson-base - 2.15.0-rc1 + ${jackson.version} pom com.fasterxml.jackson.core jackson-core - 2.15.0-rc1 + ${jackson.version} com.fasterxml.jackson.core jackson-databind - 2.15.0-rc1 + ${jackson.version} com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.15.0-rc1 + ${jackson.version} com.fasterxml.jackson.core jackson-annotations - 2.15.0-rc1 + ${jackson.version} com.fasterxml.jackson.dataformat jackson-dataformat-xml - 2.15.0-rc1 + ${jackson.version} com.fasterxml.jackson.module jackson-module-jsonSchema - 2.15.0-rc1 + ${jackson.version} io.minio diff --git a/src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java b/src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java index dbd923e5a51..9bbb202adf2 100644 --- a/src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java +++ b/src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java @@ -1,9 +1,21 @@ package com.netgrif.application.engine.configuration; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.netgrif.application.engine.configuration.properties.ElasticsearchProperties; import com.netgrif.application.engine.configuration.properties.UriProperties; +import com.netgrif.application.engine.elastic.serializer.LocalDateTimeJsonDeserializer; +import com.netgrif.application.engine.elastic.serializer.LocalDateTimeJsonSerializer; import com.netgrif.application.engine.workflow.service.CaseEventHandler; +import lombok.RequiredArgsConstructor; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -11,15 +23,12 @@ import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import java.time.LocalDateTime; + @Configuration +@RequiredArgsConstructor public class ElasticsearchConfiguration { - @Value("${spring.data.elasticsearch.url}") - private String url; - - @Value("${spring.data.elasticsearch.searchport}") - private int port; - @Value("${spring.data.elasticsearch.index.petriNet}") private String petriNetIndex; @@ -32,11 +41,9 @@ public class ElasticsearchConfiguration { @Value("${spring.data.elasticsearch.reindex}") private String cron; - private final UriProperties uriProperties; + private final ElasticsearchProperties elasticsearchProperties; - public ElasticsearchConfiguration(UriProperties uriProperties) { - this.uriProperties = uriProperties; - } + private final UriProperties uriProperties; @Bean public String springElasticsearchReindex() { @@ -65,9 +72,18 @@ public String elasticUriIndex() { @Bean public RestHighLevelClient client() { - - return new RestHighLevelClient( - RestClient.builder(new HttpHost(url, port, "http"))); + RestClientBuilder builder = RestClient.builder(new HttpHost(elasticsearchProperties.getUrl(), elasticsearchProperties.getSearchPort())); + if (hasCredentials()) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials( + elasticsearchProperties.getUsername(), + elasticsearchProperties.getPassword() + ) + ); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + return new RestHighLevelClient(builder); } @Bean @@ -79,4 +95,22 @@ public ElasticsearchOperations elasticsearchTemplate() { public CaseEventHandler caseEventHandler() { return new CaseEventHandler(); } + + @Bean(name = "elasticCaseObjectMapper") + public ObjectMapper configureMapper() { + ObjectMapper mapper = new ObjectMapper(); + + JavaTimeModule javaTimeModule = new JavaTimeModule(); + javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeJsonSerializer()); + javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeJsonDeserializer()); + + mapper.registerModule(javaTimeModule); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + return mapper; + } + + private boolean hasCredentials() { + return elasticsearchProperties.getUsername() != null && !elasticsearchProperties.getUsername().isBlank() && + elasticsearchProperties.getPassword() != null && !elasticsearchProperties.getPassword().isBlank(); + } } diff --git a/src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java b/src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java index 4f77f278cb8..f3b1269b094 100644 --- a/src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java +++ b/src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java @@ -6,6 +6,8 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.validation.Valid; +import javax.validation.constraints.Min; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -33,6 +35,10 @@ public class ElasticsearchProperties { private String url; + private String username; + + private String password; + private Map index; private boolean analyzerEnabled = false; @@ -53,6 +59,9 @@ public class ElasticsearchProperties { private List defaultSearchFilters = new ArrayList<>(); + @Valid + private BatchProperties batch = new BatchProperties(); + @PostConstruct public void init() { indexSettings.putIfAbsent("max_result_window", 10000000); @@ -72,4 +81,13 @@ public void init() { public Map getClassSpecificSettings(String className) { return classSpecificIndexSettings.getOrDefault(className, new HashMap<>()); } + + @Data + public static class BatchProperties { + @Min(1) + private int caseBatchSize = 5000; + + @Min(1) + private int taskBatchSize = 20000; + } } diff --git a/src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java b/src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java new file mode 100644 index 00000000000..e661a947d83 --- /dev/null +++ b/src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java @@ -0,0 +1,24 @@ +package com.netgrif.application.engine.elastic.domain; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.springframework.data.elasticsearch.annotations.Field; + +import java.util.List; + +import static org.springframework.data.elasticsearch.annotations.FieldType.*; + +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class CaseField extends DataField { + + @Field(type = Text) + private List caseValue; + + public CaseField(List value) { + super(value.toString()); + this.caseValue = value; + } +} diff --git a/src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java b/src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java index 1a13ad93484..89dda11f55f 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java +++ b/src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java @@ -1,9 +1,5 @@ package com.netgrif.application.engine.elastic.domain; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; -import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.netgrif.application.engine.workflow.domain.Case; import com.netgrif.application.engine.workflow.domain.TaskPair; import lombok.AllArgsConstructor; @@ -18,6 +14,7 @@ import java.sql.Timestamp; import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -56,8 +53,6 @@ public class ElasticCase { private String title; - @JsonSerialize(using = LocalDateTimeSerializer.class) - @JsonDeserialize(using = LocalDateTimeDeserializer.class) @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis) private LocalDateTime creationDate; @@ -121,7 +116,7 @@ public ElasticCase(Case useCase) { processId = useCase.getPetriNetId(); visualId = useCase.getVisualId(); title = useCase.getTitle(); - creationDate = useCase.getCreationDate(); + creationDate = useCase.getCreationDate().truncatedTo(ChronoUnit.MILLIS); creationDateSortable = Timestamp.valueOf(useCase.getCreationDate()).getTime(); author = useCase.getAuthor().getId(); authorName = useCase.getAuthor().getFullName(); diff --git a/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java b/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java new file mode 100644 index 00000000000..1d7b2e8c333 --- /dev/null +++ b/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java @@ -0,0 +1,29 @@ +package com.netgrif.application.engine.elastic.serializer; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; + +public class LocalDateTimeJsonDeserializer extends JsonDeserializer { + private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd'T'HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MILLI_OF_SECOND, 1, 3, true) + .optionalEnd() + .toFormatter(); + + @Override + public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getValueAsString(); + if (value == null || value.isEmpty()) { + return null; + } + return LocalDateTime.parse(value, FORMATTER); + } +} \ No newline at end of file diff --git a/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java b/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java new file mode 100644 index 00000000000..a500adb5268 --- /dev/null +++ b/src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java @@ -0,0 +1,18 @@ +package com.netgrif.application.engine.elastic.serializer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class LocalDateTimeJsonSerializer extends JsonSerializer { + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + + @Override + public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeString(FORMATTER.format(value)); + } +} \ No newline at end of file diff --git a/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java b/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java index 18666dfb5f0..11286761dae 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java +++ b/src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java @@ -3,6 +3,7 @@ import com.netgrif.application.engine.elastic.domain.BooleanField; import com.netgrif.application.engine.elastic.domain.ButtonField; +import com.netgrif.application.engine.elastic.domain.CaseField; import com.netgrif.application.engine.elastic.domain.DateField; import com.netgrif.application.engine.elastic.domain.FileField; import com.netgrif.application.engine.elastic.domain.I18nField; @@ -77,6 +78,8 @@ protected Optional transformDataField(String fieldId, Case useCase) { return this.transformFileListField(caseField); } else if (netField instanceof com.netgrif.application.engine.petrinet.domain.dataset.UserListField) { return this.transformUserListField(caseField); + } else if (netField instanceof com.netgrif.application.engine.petrinet.domain.dataset.CaseField) { + return this.transformCaseField(caseField); } else if (netField instanceof com.netgrif.application.engine.petrinet.domain.dataset.I18nField) { return this.transformI18nField(caseField, (com.netgrif.application.engine.petrinet.domain.dataset.I18nField) netField); } else { @@ -227,11 +230,11 @@ private StringBuilder buildFullName(String name, String surname) { protected Optional transformDateField(com.netgrif.application.engine.workflow.domain.DataField dateField, com.netgrif.application.engine.petrinet.domain.dataset.DateField netField) { if (dateField.getValue() instanceof LocalDate) { LocalDate date = (LocalDate) dateField.getValue(); - return formatDateField(LocalDateTime.of(date, LocalTime.NOON)); + return formatDateField(LocalDateTime.of(date, LocalTime.MIDNIGHT)); } else if (dateField.getValue() instanceof Date) { // log.warn(String.format("DateFields should have LocalDate values! DateField (%s) with Date value found! Value will be converted for indexation.", netField.getImportId())); LocalDateTime transformed = this.transformDateValueField(dateField); - return formatDateField(LocalDateTime.of(transformed.toLocalDate(), LocalTime.NOON)); + return formatDateField(LocalDateTime.of(transformed.toLocalDate(), LocalTime.MIDNIGHT)); } else { // TODO throw error? log.error(String.format("Unsupported DateField value type (%s)! Skipping indexation...", dateField.getValue().getClass().getCanonicalName())); @@ -283,6 +286,10 @@ protected Optional transformFileListField(com.netgrif.application.eng return Optional.of(new FileField(((FileListFieldValue) fileListField.getValue()).getNamesPaths().toArray(new FileFieldValue[0]))); } + protected Optional transformCaseField(com.netgrif.application.engine.workflow.domain.DataField caseField) { + return Optional.of(new CaseField((List) caseField.getValue())); + } + protected Optional transformOtherFields(com.netgrif.application.engine.workflow.domain.DataField otherField, Field netField) { log.warn("Field of type " + netField.getClass().getCanonicalName() + " is not supported for indexation by default. Indexing the toString() representation of its value..."); return Optional.of(new TextField(otherField.getValue().toString())); diff --git a/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java b/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java index e4ff155a89b..ec41ec1f7f0 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java +++ b/src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java @@ -3,19 +3,32 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.netgrif.application.engine.configuration.properties.ElasticsearchProperties; +import com.netgrif.application.engine.elastic.domain.ElasticCase; +import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; +import com.netgrif.application.engine.elastic.domain.ElasticTask; +import com.netgrif.application.engine.elastic.domain.ElasticTaskRepository; import com.netgrif.application.engine.elastic.service.interfaces.IElasticIndexService; +import com.netgrif.application.engine.petrinet.service.PetriNetService; +import com.netgrif.application.engine.workflow.domain.Case; +import com.netgrif.application.engine.workflow.domain.Task; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CloseIndexRequest; import org.elasticsearch.client.indices.CloseIndexResponse; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.xcontent.XContentType; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.core.io.Resource; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Setting; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; @@ -24,14 +37,17 @@ import org.springframework.data.elasticsearch.core.SearchScrollHits; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.util.CloseableIterator; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import java.io.InputStream; import java.lang.reflect.Field; +import java.time.LocalDateTime; import java.util.*; @Slf4j @@ -40,17 +56,56 @@ public class ElasticIndexService implements IElasticIndexService { private static final String PLACEHOLDERS = "petriNetIndex, caseIndex, taskIndex"; - @Autowired - private ApplicationContext context; + private final ApplicationContext context; - @Autowired - private ElasticsearchRestTemplate elasticsearchTemplate; + private final ElasticsearchRestTemplate elasticsearchTemplate; - @Autowired - private ElasticsearchOperations operations; + private final RestHighLevelClient elasticsearchClient; - @Autowired - private ElasticsearchProperties elasticsearchProperties; + private final ElasticsearchOperations operations; + + private final ElasticsearchProperties elasticsearchProperties; + + private final ElasticCaseRepository elasticCaseRepository; + + private final ElasticTaskRepository elasticTaskRepository; + + private final PetriNetService petriNetService; + + private final MongoTemplate mongoTemplate; + + private final ElasticCaseMappingService caseMappingService; + + private final ElasticTaskMappingService taskMappingService; + + private final ObjectMapper objectMapper; + + public ElasticIndexService(ApplicationContext context, + ElasticsearchRestTemplate elasticsearchTemplate, + RestHighLevelClient elasticsearchClient, + ElasticsearchOperations operations, + ElasticsearchProperties elasticsearchProperties, + ElasticCaseRepository elasticCaseRepository, + ElasticTaskRepository elasticTaskRepository, + PetriNetService petriNetService, + MongoTemplate mongoTemplate, + ElasticCaseMappingService caseMappingService, + ElasticTaskMappingService taskMappingService, + @Qualifier("elasticCaseObjectMapper") + ObjectMapper objectMapper) { + this.context = context; + this.elasticsearchTemplate = elasticsearchTemplate; + this.elasticsearchClient = elasticsearchClient; + this.operations = operations; + this.elasticsearchProperties = elasticsearchProperties; + this.elasticCaseRepository = elasticCaseRepository; + this.elasticTaskRepository = elasticTaskRepository; + this.petriNetService = petriNetService; + this.mongoTemplate = mongoTemplate; + this.caseMappingService = caseMappingService; + this.taskMappingService = taskMappingService; + this.objectMapper = objectMapper; + } @Override public boolean indexExists(String indexName) { @@ -69,24 +124,6 @@ public String index(Class clazz, T source, String... placeholders) { .withObject(source).build(), IndexCoordinates.of(indexName)); } - - @Override - public boolean bulkIndex(List list, Class clazz, String... placeholders) { - String indexName = getIndexName(clazz, placeholders); - try { - if (list != null && !list.isEmpty()) { - List indexQueries = new ArrayList<>(); - list.forEach(source -> - indexQueries.add(new IndexQueryBuilder().withId(getIdFromSource(source)).withObject(source).build())); - elasticsearchTemplate.bulkIndex(indexQueries, IndexCoordinates.of(indexName)); - } - } catch (Exception e) { - log.error("bulkIndex:", e); - return false; - } - return true; - } - @Override public boolean createIndex(Class clazz, String... placeholders) { try { @@ -304,6 +341,248 @@ public void clearScrollHits(List scrollIds) { } } + + /** + * Performs bulk indexing of cases and tasks into Elasticsearch. + * + * @param indexAll if true, indexes all cases and tasks, regardless of modification time + * @param after the time after which cases and tasks should be considered for reindexing + * @param caseBatchSize number of cases to process per batch. If null, defaults from Elasticsearch properties + * @param taskBatchSize number of tasks to process per batch. If null, defaults from Elasticsearch properties + */ + @Override + public void bulkIndex(boolean indexAll, LocalDateTime after, Integer caseBatchSize, Integer taskBatchSize) { + log.info("Reindexing stale cases: started reindexing after {}", after); + LocalDateTime now = LocalDateTime.now(); + + if (caseBatchSize == null) { + caseBatchSize = elasticsearchProperties.getBatch().getCaseBatchSize(); + } + if (taskBatchSize == null) { + taskBatchSize = elasticsearchProperties.getBatch().getTaskBatchSize(); + } + + org.springframework.data.mongodb.core.query.Query query; + if (indexAll || after == null) { + query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now)); + log.info("Reindexing stale cases: force all"); + } else { + query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now).gt(after.minusMinutes(2))); + } + + long count = mongoTemplate.count(query, Case.class); + if (count > 0) { + reindexQueried(query, count, caseBatchSize, taskBatchSize); + } + log.info("Reindexing stale cases: end"); + } + + /** + * Reindexes queried cases and tasks into Elasticsearch in batches. + * + * @param count total number of cases to reindex + * @param caseBatchSize batch size for cases + * @param taskBatchSize batch size for tasks + */ + private void reindexQueried(org.springframework.data.mongodb.core.query.Query query, long count, int caseBatchSize, int taskBatchSize) { + long numOfPages = ((count / caseBatchSize) + 1); + log.info("Reindexing {} pages", numOfPages); + + query.cursorBatchSize(caseBatchSize); + long page = 1, currentBatchSize = 0; + List caseOperations = new ArrayList<>(); + List caseIds = new ArrayList<>(); + + try (CloseableIterator cursor = mongoTemplate.stream(query, Case.class)) { + while (cursor.hasNext()) { + Case aCase = cursor.next(); + prepareCase(aCase); + ElasticCase elasticCase = caseMappingService.transform(aCase); + ElasticCase existingCase = null; + try { + existingCase = elasticCaseRepository.findByStringId(aCase.getStringId()); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("[{}]: Case \"{}\" has duplicates, will reindex.", aCase.getStringId(), aCase.getTitle()); + elasticCaseRepository.deleteAllByStringId(aCase.getStringId()); + } + if (existingCase == null) { + existingCase = elasticCase; + } else { + existingCase.update(elasticCase); + } + prepareCaseBulkOperation(existingCase, caseOperations); + caseIds.add(aCase.getStringId()); + + if (++currentBatchSize == caseBatchSize || !cursor.hasNext()) { + log.info("Reindexing case page {} / {}", page, numOfPages); + executeAndValidate(caseOperations); + bulkIndexTasks(caseIds, taskBatchSize); + caseOperations.clear(); + caseIds.clear(); + currentBatchSize = 0; + page++; + } + } + } + } + + /** + * Reindexes tasks into Elasticsearch in batches corresponding to the provided case IDs. + * + * @param caseIds list of case IDs whose tasks need to be reindexed + * @param taskBatchSize size of the batch for tasks + */ + private void bulkIndexTasks(List caseIds, int taskBatchSize) { + if (caseIds == null || caseIds.isEmpty()) { + return; + } + org.springframework.data.mongodb.core.query.Query query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("caseId").in(caseIds)).cursorBatchSize(taskBatchSize); + long totalSize = mongoTemplate.count(query, Task.class); + long numOfPages = ((totalSize / taskBatchSize) + 1); + + long page = 1, currentBatchSize = 0; + List taskOperations = new ArrayList<>(); + + try (CloseableIterator cursor = mongoTemplate.stream(query, Task.class)) { + while (cursor.hasNext()) { + Task task = cursor.next(); + ElasticTask elasticTask = taskMappingService.transform(task); + ElasticTask existingTask = null; + try { + existingTask = elasticTaskRepository.findByStringId(task.getStringId()); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("[{}]: Task \"{}\" has duplicates, will reindex.", task.getStringId(), task.getTitle()); + elasticTaskRepository.deleteAllByStringId(task.getStringId()); + } + if (existingTask == null) { + existingTask = elasticTask; + } else { + existingTask.update(elasticTask); + } + prepareTaskBulkOperation(existingTask, taskOperations); + + if (++currentBatchSize == taskBatchSize || !cursor.hasNext()) { + log.info("Reindexing task page {} / {}", page, numOfPages); + executeAndValidate(taskOperations); + taskOperations.clear(); + currentBatchSize = 0; + page++; + } + } + } + } + + /** + * Prepares the case object by ensuring necessary dependencies and last modified timestamp are set. + * + * @param useCase case object to prepare + */ + private void prepareCase(Case useCase) { + if (useCase.getPetriNet() == null) { + useCase.setPetriNet(petriNetService.get(useCase.getPetriNetObjectId())); + } + if (useCase.getLastModified() == null) { + useCase.setLastModified(LocalDateTime.now()); + } + } + + /** + * Prepares a bulk operation for indexing or updating a case in Elasticsearch. + * + * @param doc transformed ElasticCase object + * @param operations collection of BulkOperations to add this operation to + */ + private void prepareCaseBulkOperation(ElasticCase doc, List operations) { + try { + String json = objectMapper.writeValueAsString(doc); + UpdateRequest updateRequest = new UpdateRequest() + .id(doc.getId() == null ? doc.getStringId() : doc.getId()) + .doc(json, XContentType.JSON) + .upsert(json, XContentType.JSON) + .index(elasticsearchProperties.getIndex().get("case")); + operations.add(updateRequest); + } catch (Exception e) { + log.error("Failed to prepare bulk operation for case [{}]: {}", doc.getStringId(), e.getMessage()); + } + } + + /** + * Prepares a bulk operation for indexing or updating a task in Elasticsearch. + * + * @param doc transformed ElasticTask object + * @param operations collection of BulkOperations to add this operation to + */ + private void prepareTaskBulkOperation(ElasticTask doc, List operations) { + try { + String json = objectMapper.writeValueAsString(doc); + UpdateRequest updateRequest = new UpdateRequest() + .id(doc.getId() == null ? doc.getStringId() : doc.getId()) + .doc(json, XContentType.JSON) + .upsert(json, XContentType.JSON) + .index(elasticsearchProperties.getIndex().get("task")); + operations.add(updateRequest); + } catch (Exception e) { + log.error("Failed to prepare bulk operation for task [{}]: {}", doc.getStringId(), e.getMessage()); + } + } + + /** + * Executes the bulk operations and validates the results, retrying on partial failures. + * + * @param operations list of bulk operations to execute + */ + private void executeAndValidate(List operations) { + if (operations.isEmpty()) { + return; + } + + BulkRequest request = new BulkRequest(); + operations.forEach(request::add); + + try { + BulkResponse response = elasticsearchClient.bulk(request, RequestOptions.DEFAULT); + checkForBulkUpdateFailure(response); + log.info("Batch indexed successfully with {} ops", operations.size()); + } catch (ElasticsearchException e) { + log.warn("Failed for {} ops to index bulk {}", operations.size(), e.getMessage(), e); + + if (operations.size() == 1) { + log.error("Single operation failed. Skipping. {}", operations.get(0), e); + return; + } + + log.warn("Dividing the requirement."); + + int mid = operations.size() / 2; + List left = operations.subList(0, mid); + List right = operations.subList(mid, operations.size()); + + executeAndValidate(new ArrayList<>(left)); + executeAndValidate(new ArrayList<>(right)); + } catch (Exception e) { + log.error("Failed to index bulk: {}", e.getMessage(), e); + } + } + + /** + * Checks the results of a bulk indexing operation for failures. + * + * @param response the BulkResponse from Elasticsearch + * @throws ElasticsearchException if there are failures in the bulk response + */ + private void checkForBulkUpdateFailure(BulkResponse response) { + Map failedDocuments = new HashMap<>(); + Arrays.stream(response.getItems()).forEach(item -> { + if (item.getFailure() != null) { + failedDocuments.put(item.getId(), item.getFailure().getMessage()); + } + }); + + if (!failedDocuments.isEmpty()) { + throw new ElasticsearchException("Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for details [{}]", failedDocuments); + } + } + private String getIdFromSource(Object source) { if (source == null) { return null; diff --git a/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java b/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java index ae21422527c..3b9426c74fc 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java +++ b/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java @@ -1,18 +1,13 @@ package com.netgrif.application.engine.elastic.service; import com.netgrif.application.engine.elastic.domain.ElasticCaseRepository; -import com.netgrif.application.engine.elastic.service.interfaces.IElasticCaseMappingService; -import com.netgrif.application.engine.elastic.service.interfaces.IElasticCaseService; -import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskMappingService; -import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskService; +import com.netgrif.application.engine.elastic.service.interfaces.*; import com.netgrif.application.engine.workflow.domain.Case; -import com.netgrif.application.engine.workflow.domain.QCase; import com.netgrif.application.engine.workflow.domain.Task; import com.netgrif.application.engine.workflow.domain.repositories.CaseRepository; import com.netgrif.application.engine.workflow.domain.repositories.TaskRepository; import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; import com.querydsl.core.types.Predicate; -import com.querydsl.core.types.dsl.BooleanExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,7 +31,6 @@ public class ReindexingTask { private static final Logger log = LoggerFactory.getLogger(ReindexingTask.class); private int pageSize; - private CaseRepository caseRepository; private TaskRepository taskRepository; private ElasticCaseRepository elasticCaseRepository; private IElasticCaseService elasticCaseService; @@ -44,12 +38,12 @@ public class ReindexingTask { private IElasticCaseMappingService caseMappingService; private IElasticTaskMappingService taskMappingService; private IWorkflowService workflowService; + private IElasticIndexService elasticIndexService; private LocalDateTime lastRun; @Autowired public ReindexingTask( - CaseRepository caseRepository, TaskRepository taskRepository, ElasticCaseRepository elasticCaseRepository, @Qualifier("reindexingTaskElasticCaseService") @@ -60,8 +54,8 @@ public ReindexingTask( IElasticTaskMappingService taskMappingService, IWorkflowService workflowService, @Value("${spring.data.elasticsearch.reindexExecutor.size:20}") int pageSize, - @Value("${spring.data.elasticsearch.reindex-from:#{null}}") Duration from) { - this.caseRepository = caseRepository; + @Value("${spring.data.elasticsearch.reindex-from:#{null}}") Duration from, + IElasticIndexService elasticIndexService) { this.taskRepository = taskRepository; this.elasticCaseRepository = elasticCaseRepository; this.elasticCaseService = elasticCaseService; @@ -69,6 +63,7 @@ public ReindexingTask( this.caseMappingService = caseMappingService; this.taskMappingService = taskMappingService; this.workflowService = workflowService; + this.elasticIndexService = elasticIndexService; this.pageSize = pageSize; lastRun = LocalDateTime.now(); @@ -80,27 +75,11 @@ public ReindexingTask( @Scheduled(cron = "#{springElasticsearchReindex}") public void reindex() { log.info("Reindexing stale cases: started reindexing after " + lastRun); - - BooleanExpression predicate = QCase.case$.lastModified.before(LocalDateTime.now()).and(QCase.case$.lastModified.after(lastRun.minusMinutes(2))); - + elasticIndexService.bulkIndex(false, lastRun, null, null); lastRun = LocalDateTime.now(); - long count = caseRepository.count(predicate); - if (count > 0) { - reindexAllPages(predicate, count); - } - log.info("Reindexing stale cases: end"); } - private void reindexAllPages(BooleanExpression predicate, long count) { - long numOfPages = ((count / pageSize) + 1); - log.info("Reindexing " + numOfPages + " pages"); - - for (int page = 0; page < numOfPages; page++) { - reindexPage(predicate, page, numOfPages, false); - } - } - public void forceReindexPage(Predicate predicate, int page, long numOfPages) { reindexPage(predicate, page, numOfPages, true); } diff --git a/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java b/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java index 5660a6db477..f23036457b4 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java +++ b/src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java @@ -1,10 +1,12 @@ package com.netgrif.application.engine.elastic.service.interfaces; +import com.querydsl.core.types.Predicate; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.SearchScrollHits; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.query.Query; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,8 +33,6 @@ public interface IElasticIndexService { String index(Class clazz, T source, String... placeholders); - boolean bulkIndex(List list, Class clazz, String... placeholders); - SearchScrollHits scrollFirst(Query query, Class clazz, String... placeholders); SearchScrollHits scroll(String scrollId, Class clazz, String... placeholders); @@ -42,4 +42,6 @@ public interface IElasticIndexService { void applySettings(HashMap settingMap, Class clazz); void clearScrollHits(List scrollIds); + + void bulkIndex(boolean indexAll, LocalDateTime lastRun, Integer caseBatchSize, Integer taskBatchSize); } diff --git a/src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java b/src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java index 9a738c8ab58..c50c37df036 100644 --- a/src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java +++ b/src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java @@ -2,6 +2,8 @@ import com.netgrif.application.engine.auth.domain.LoggedUser; import com.netgrif.application.engine.elastic.service.ReindexingTask; +import com.netgrif.application.engine.elastic.service.interfaces.IElasticIndexService; +import com.netgrif.application.engine.elastic.web.requestbodies.IndexParams; import com.netgrif.application.engine.workflow.service.CaseSearchService; import com.netgrif.application.engine.workflow.service.interfaces.IWorkflowService; import com.netgrif.application.engine.workflow.web.responsebodies.MessageResource; @@ -20,10 +22,7 @@ import org.springframework.http.MediaType; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.core.Authentication; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.Locale; import java.util.Map; @@ -49,6 +48,9 @@ public class ElasticController { @Autowired private ReindexingTask reindexingTask; + @Autowired + private IElasticIndexService indexService; + @Value("${spring.data.elasticsearch.reindexExecutor.size:20}") private int pageSize; @@ -69,11 +71,11 @@ public MessageResource reindex(@RequestBody Map searchBody, Auth if (count == 0) { log.info("No cases to reindex"); } else { - long numOfPages = (long) ((count / pageSize) + 1); - log.info("Reindexing cases: " + numOfPages + " pages"); + long numOfPages = (count / pageSize) + 1; + log.info("Reindexing cases: {} pages", numOfPages); for (int page = 0; page < numOfPages; page++) { - log.info("Indexing page " + (page + 1)); + log.info("Indexing page {}", (page + 1)); Predicate predicate = searchService.buildQuery(searchBody, user, locale); reindexingTask.forceReindexPage(predicate, page, numOfPages); } @@ -85,4 +87,23 @@ public MessageResource reindex(@RequestBody Map searchBody, Auth return MessageResource.errorMessage(e.getMessage()); } } + + @PreAuthorize("@authorizationService.hasAuthority('ADMIN')") + @Operation(summary = "Reindex all or stale cases with bulk index", + description = "Reindex all or stale cases (specified by IndexParams.indexAll param) with bulk index. Caller must have the ADMIN role", + security = {@SecurityRequirement(name = "BasicAuth")}) + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "403", description = "Caller doesn't fulfill the authorisation requirements"), + }) + @PostMapping(value = "/reindex/bulk", produces = MediaType.APPLICATION_JSON_VALUE) + public MessageResource bulkReindex(IndexParams indexParams) { + try { + indexService.bulkIndex(indexParams.isIndexAll(), null, indexParams.getCaseBatchSize(), indexParams.getTaskBatchSize()); + return MessageResource.successMessage("Success"); + } catch (Exception e) { + log.error("Could not index: ", e); + return MessageResource.errorMessage(e.getMessage()); + } + } } diff --git a/src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java b/src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java new file mode 100644 index 00000000000..9e3adab68c6 --- /dev/null +++ b/src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java @@ -0,0 +1,28 @@ +package com.netgrif.application.engine.elastic.web.requestbodies; + +import lombok.Data; + + +/** + * Represents the parameters to configure the indexing operation. + * This class allows customization of batch sizes for cases and tasks, + * as well as the option to index all data. + */ +@Data +public class IndexParams { + + /** + * Determines whether to index all available data. Default is {@code false}. + */ + private boolean indexAll = false; + + /** + * Specifies the batch size for cases during indexing. Default is {@code 5000}. + */ + private Integer caseBatchSize = 5000; + + /** + * Specifies the batch size for tasks during indexing. Default is {@code 20000}. + */ + private Integer taskBatchSize = 20000; +} diff --git a/src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java b/src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java index 841b720f503..9ee7b41ab56 100644 --- a/src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java +++ b/src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java @@ -16,6 +16,8 @@ public interface TaskRepository extends MongoRepository, QuerydslP List findAllByCaseId(String id); + List findAllByCaseIdIn(Collection ids); + Page findByCaseIdIn(Pageable pageable, Collection ids); Page findByTransitionIdIn(Pageable pageable, Collection ids); diff --git a/src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java b/src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java index 2a045ffad93..bde3ec1fad7 100644 --- a/src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java +++ b/src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java @@ -36,7 +36,6 @@ import com.netgrif.application.engine.workflow.domain.eventoutcomes.dataoutcomes.SetDataEventOutcome; import com.netgrif.application.engine.workflow.domain.eventoutcomes.taskoutcomes.*; import com.netgrif.application.engine.workflow.domain.repositories.TaskRepository; -import com.netgrif.application.engine.workflow.domain.triggers.AutoTrigger; import com.netgrif.application.engine.workflow.domain.triggers.TimeTrigger; import com.netgrif.application.engine.workflow.domain.triggers.Trigger; import com.netgrif.application.engine.workflow.service.interfaces.IDataService; diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ea5b8cf5e65..8d996df2fc1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -41,7 +41,8 @@ spring.data.elasticsearch.drop=false spring.data.elasticsearch.executors.size=500 spring.data.elasticsearch.executors.timeout=5 spring.data.elasticsearch.reindex=0 0 * * * * -spring.data.elasticsearch.reindexExecutor.size=20 +spring.data.elasticsearch.reindexExecutor.caseSize=5100 +spring.data.elasticsearch.reindexExecutor.taskSize=20000 spring.data.elasticsearch.reindexExecutor.timeout=60 # Mail Service diff --git a/src/test/groovy/com/netgrif/application/engine/elastic/DataSearchRequestTest.groovy b/src/test/groovy/com/netgrif/application/engine/elastic/DataSearchRequestTest.groovy index 08017730977..56cd0c04d9b 100644 --- a/src/test/groovy/com/netgrif/application/engine/elastic/DataSearchRequestTest.groovy +++ b/src/test/groovy/com/netgrif/application/engine/elastic/DataSearchRequestTest.groovy @@ -158,7 +158,7 @@ class DataSearchRequestTest { new AbstractMap.SimpleEntry("user.emailValue.keyword" as String, "${testUser1.email}" as String), new AbstractMap.SimpleEntry("user.fullNameValue.keyword" as String, "${testUser1.fullName}" as String), new AbstractMap.SimpleEntry("user.userIdValue" as String, "${testUser1.getId()}" as String), - new AbstractMap.SimpleEntry("date.timestampValue" as String, "${Timestamp.valueOf(LocalDateTime.of(date, LocalTime.NOON)).getTime()}" as String), + new AbstractMap.SimpleEntry("date.timestampValue" as String, "${Timestamp.valueOf(LocalDateTime.of(date, LocalTime.MIDNIGHT)).getTime()}" as String), new AbstractMap.SimpleEntry("datetime.timestampValue" as String, "${Timestamp.valueOf(date.atTime(13, 37)).getTime()}" as String), new AbstractMap.SimpleEntry("enumeration" as String, "Alice" as String), new AbstractMap.SimpleEntry("enumeration" as String, "Alica" as String),