From 1d42c739557ef6776517307a58a71c2c8126608d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sat, 26 Jul 2025 17:33:03 +0300 Subject: [PATCH 1/2] update index mapping, add specific module for client persistence --- build.sbt | 14 +- .../elastic/client/ElasticClientApi.scala | 627 +++++++++++++++++- .../elastic/client/MappingComparator.scala | 87 +++ .../softnetwork/elastic/client/package.scala | 14 +- java/persistence/build.sbt | 3 + .../query}/ElasticsearchClientProvider.scala | 4 +- ...asticProcessorStreamWithJavaProvider.scala | 1 - .../client/java/ElasticsearchClientApi.scala | 382 +++++++---- .../java/ElasticsearchClientCompanion.scala | 7 - testkit/build.sbt | 28 +- .../elastic/client/MockElasticClientApi.scala | 41 +- .../person/ElasticPersonTestKit.scala | 15 + .../PersonToElasticProcessorStream.scala | 14 + .../test/resources/mapping/person.mustache | 21 + .../test/resources/mapping/sample.mustache | 16 - .../elastic/client/ElasticClientSpec.scala | 235 +++++-- .../client/ElasticsearchProviders.scala | 2 +- ...ElasticsearchClientPersonHandlerSpec.scala | 39 ++ 18 files changed, 1318 insertions(+), 232 deletions(-) create mode 100644 client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala create mode 100644 java/persistence/build.sbt rename java/{src/main/scala/app/softnetwork/elastic/client/java => persistence/src/main/scala/app/softnetwork/elastic/persistence/query}/ElasticsearchClientProvider.scala (68%) rename java/{ => persistence}/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala (86%) create mode 100644 testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala create mode 100644 testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala create mode 100644 testkit/src/test/resources/mapping/person.mustache delete mode 100644 testkit/src/test/resources/mapping/sample.mustache create mode 100644 testkit/src/test/scala/app/softnetwork/persistence/person/ElasticsearchClientPersonHandlerSpec.scala diff --git a/build.sbt b/build.sbt index 5d332cc9..bf070da4 100644 --- a/build.sbt +++ b/build.sbt @@ -106,6 +106,16 @@ lazy val java = project.in(file("java")) Defaults.itSettings, moduleSettings ) + .dependsOn( + client % "compile->compile;test->test;it->it" + ) + +lazy val javaPersistence = project.in(file("java/persistence")) + .configs(IntegrationTest) + .settings(Defaults.itSettings) + .dependsOn( + java % "compile->compile;test->test;it->it", + ) .dependsOn( persistence % "compile->compile;test->test;it->it" ) @@ -119,7 +129,7 @@ lazy val testKit = project.in(file("testkit")) ) .enablePlugins(BuildInfoPlugin) .dependsOn( - java % "compile->compile;test->test;it->it" + javaPersistence % "compile->compile;test->test;it->it" ) lazy val root = project.in(file(".")) @@ -129,4 +139,4 @@ lazy val root = project.in(file(".")) Publish.noPublishSettings, crossScalaVersions := Nil ) - .aggregate(sql, client, persistence, java, testKit) + .aggregate(sql, client, persistence, java, javaPersistence, testKit) diff --git a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index 8e5ed58d..b46c76d5 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -9,14 +9,18 @@ import akka.stream.scaladsl._ import app.softnetwork.persistence.model.Timestamped import app.softnetwork.serialization._ import app.softnetwork.elastic.sql.SQLQuery +import com.google.gson.JsonParser import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.scalalogging.StrictLogging import org.json4s.{DefaultFormats, Formats} import org.json4s.jackson.JsonMethods._ +import java.util.UUID import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} /** Created by smanciot on 28/06/2018. */ @@ -42,6 +46,10 @@ trait ElasticClientApi } trait IndicesApi { + + /** Default settings for indices. This is used when creating an index without providing specific + * settings. It includes ngram tokenizer and analyzer, as well as some default limits. + */ val defaultSettings: String = """ |{ @@ -86,22 +94,95 @@ trait IndicesApi { |} """.stripMargin + /** Create an index with the provided name and settings. + * @param index + * - the name of the index to create + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the index was created successfully, false otherwise + */ def createIndex(index: String, settings: String = defaultSettings): Boolean + /** Delete an index with the provided name. + * @param index + * - the name of the index to delete + * @return + * true if the index was deleted successfully, false otherwise + */ def deleteIndex(index: String): Boolean + /** Close an index with the provided name. + * @param index + * - the name of the index to close + * @return + * true if the index was closed successfully, false otherwise + */ def closeIndex(index: String): Boolean + /** Open an index with the provided name. + * @param index + * - the name of the index to open + * @return + * true if the index was opened successfully, false otherwise + */ def openIndex(index: String): Boolean + + /** Reindex from source index to target index. + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean = true): Boolean + + /** Check if an index exists. + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + def indexExists(index: String): Boolean } trait AliasApi { + + /** Add an alias to the given index. + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was added successfully, false otherwise + */ def addAlias(index: String, alias: String): Boolean + + /** Remove an alias from the given index. + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was removed successfully, false otherwise + */ def removeAlias(index: String, alias: String): Boolean } trait SettingsApi { _: IndicesApi => - def toggleRefresh(index: String, enable: Boolean): Unit = { + + /** Toggle the refresh interval of an index. + * @param index + * - the name of the index + * @param enable + * - true to enable the refresh interval, false to disable it + * @return + * true if the settings were updated successfully, false otherwise + */ + def toggleRefresh(index: String, enable: Boolean): Boolean = { updateSettings( index, if (!enable) """{"index" : {"refresh_interval" : -1} }""" @@ -109,37 +190,292 @@ trait SettingsApi { _: IndicesApi => ) } - def setReplicas(index: String, replicas: Int): Unit = { + /** Set the number of replicas for an index. + * @param index + * - the name of the index + * @param replicas + * - the number of replicas to set + * @return + * true if the settings were updated successfully, false otherwise + */ + def setReplicas(index: String, replicas: Int): Boolean = { updateSettings(index, s"""{"index" : {"number_of_replicas" : $replicas} }""") } + /** Update the settings of an index. + * @param index + * - the name of the index + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the settings were updated successfully, false otherwise + */ def updateSettings(index: String, settings: String = defaultSettings): Boolean - def loadSettings(): String + /** Load the settings of an index. + * @param index + * - the name of the index to load the settings for + * @return + * the settings of the index as a JSON string + */ + def loadSettings(index: String): String } -trait MappingApi { - @deprecated("Use setMapping(index: String, mapping: String) instead", "0.7.29") +trait MappingApi extends IndicesApi with RefreshApi with StrictLogging { + @deprecated("Use setMapping(index: String, mapping: String) instead", "7.17.29") def setMapping(index: String, indexType: String, mapping: String): Boolean = { this.setMapping(index, mapping) } + + /** Set the mapping of an index. + * @param index + * - the name of the index to set the mapping for + * @param mapping + * - the mapping to set on the index + * @return + * true if the mapping was set successfully, false otherwise + */ def setMapping(index: String, mapping: String): Boolean - @deprecated("Use getMapping(index: String) instead", "0.7.29") + @deprecated("Use getMapping(index: String) instead", "7.17.29") def getMapping(index: String, indexType: String): String = { this.getMapping(index) } + + /** Get the mapping of an index. + * @param index + * - the name of the index to get the mapping for + * @return + * the mapping of the index as a JSON string + */ def getMapping(index: String): String + + /** Get the mapping properties of an index. + * @param index + * - the name of the index to get the mapping properties for + * @return + * the mapping properties of the index as a JSON string + */ + def getMappingProperties(index: String): String = { + val mapping = tryOrElse(getMapping(index), "{\"mappings\": {\"properties\": {}}}")(logger) + Try( + new JsonParser() + .parse(mapping) + .getAsJsonObject + .get("mappings") + .toString + ) match { + case Success(properties) => properties + case Failure(exception) => + logger.error(s"Failed to parse mapping properties for index $index and $mapping", exception) + "{\"properties\": {}}" // Return an empty properties object in case of failure + } + } + + /** Check if the mapping of an index is different from the provided mapping. + * @param index + * - the name of the index to check + * @param mapping + * - the mapping to compare with the current mapping of the index + * @return + * true if the mapping is different, false otherwise + */ + def shouldUpdateMapping( + index: String, + mapping: String + ): Boolean = { + MappingComparator.isMappingDifferent(this.getMappingProperties(index), mapping) + } + + /** Update the mapping of an index to a new mapping. + * @param index + * - the name of the index to migrate + * @param mapping + * - the new mapping to set on the index + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the mapping was updated successfully, false otherwise + */ + def updateMapping( + index: String, + mapping: String, + settings: String = defaultSettings + ): Boolean = { + // Check if the index exists + if (!tryOrElse(this.indexExists(index), false)(logger)) { + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to create index: $index") + return false + } + logger.info(s"Index $index created successfully.") + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for index: $index") + return false + } + logger.info(s"Mapping for index $index set successfully.") + true + } + // Check if the mapping needs to be updated + else if (shouldUpdateMapping(index, mapping)) { + val tempIndex = index + "_tmp_" + UUID.randomUUID() + var tempCreated = false + var originalDeleted = false + logger.info("--- Starting dynamic mapping migration ---") + logger.info("Target index: " + index) + logger.info("Temporary index: " + tempIndex) + def migrate(): Boolean = { + // Create a temporary index with the new mapping + tempCreated = tryOrElse(this.createIndex(tempIndex, settings), false)(logger) + if (tempCreated) { + logger.info(s"Temporary index $tempIndex created successfully.") + // Set the new mapping on the temporary index + if (!tryOrElse(this.setMapping(tempIndex, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for temporary index: $tempIndex") + return false + } + logger.info(s"Mapping for temporary index $tempIndex set successfully.") + // Reindex from the original index to the temporary index + if (!tryOrElse(this.reindex(index, tempIndex), false)(logger)) { + logger.error( + s"Failed to reindex from original index: $index to temporary index: $tempIndex" + ) + return false + } + logger.info( + s"Reindexing from original index $index to temporary index $tempIndex completed successfully." + ) + // Delete the original index + originalDeleted = this.deleteIndex(index) + if (originalDeleted) { + logger.info(s"Original index $index deleted successfully.") + // Rename the temporary index to the original index name + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to recreate original index: $index") + return false + } + logger.info(s"Original index $index recreated successfully.") + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for original index: $index") + return false + } + logger.info(s"Mapping for original index $index set successfully.") + if (!tryOrElse(this.reindex(tempIndex, index), false)(logger)) { + logger.error( + s"Failed to reindex from temporary index: $tempIndex to original index: $index" + ) + return false + } + logger.info( + s"Reindexing from temporary index $tempIndex to original index $index completed successfully." + ) + if (!tryOrElse(this.openIndex(index), false)(logger)) { + logger.error(s"Failed to open original index: $index") + return false + } + logger.info(s"Original index $index opened successfully.") + logger.info("Dynamic mapping migration completed successfully.") + true + } else { + logger.error(s"Failed to delete original index: $index") + false + } + } else { + logger.error(s"Failed to create temporary index: $tempIndex") + false + } + } + val migration = Try(migrate()) match { + case Success(result) => result + case Failure(exception) => + logger.error("Exception during dynamic mapping migration", exception) + false + } + if (!migration) { + logger.error("Error during dynamic mapping migration") + if (originalDeleted) { + // If the original index was deleted, we need to recreate it + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to recreate original index: $index") + } else { + logger.info(s"Original index $index recreated successfully.") + // Set the original mapping back + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for original index: $index") + } else { + logger.info(s"Mapping for original index $index set successfully.") + if (!tryOrElse(this.reindex(tempIndex, index), false)(logger)) { + logger.error( + s"Failed to reindex from temporary index $tempIndex to original index $index" + ) + } else { + logger.info( + s"Reindexing from temporary index $tempIndex to original index $index completed successfully." + ) + if (!tryOrElse(this.refresh(index), false)(logger)) { + logger.error(s"Failed to refresh original index: $index") + } else { + logger.info(s"Original index $index refreshed successfully.") + } + } + } + } + } + } + if (tempCreated) { + // Clean up the temporary index if it was created + if (!tryOrElse(this.deleteIndex(tempIndex), false)(logger)) { + logger.error(s"Failed to delete temporary index: $tempIndex") + } else { + logger.info(s"Temporary index $tempIndex deleted successfully.") + } + } else { + logger.error(s"Temporary index $tempIndex was not created, skipping deletion.") + } + migration + } else { + false + } + } } trait RefreshApi { + + /** Refresh the index to make sure all documents are indexed and searchable. + * @param index + * - the name of the index to refresh + * @return + * true if the index was refreshed successfully, false otherwise + */ def refresh(index: String): Boolean } trait FlushApi { + + /** Flush the index to make sure all operations are written to disk. + * @param index + * - the name of the index to flush + * @param force + * - true to force the flush, false otherwise + * @param wait + * - true to wait for the flush to complete, false otherwise + * @return + * true if the index was flushed successfully, false otherwise + */ def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean } trait IndexApi { _: RefreshApi => + + /** Index an entity in the given index. + * @param entity + * - the entity to index + * @param index + * - the name of the index to index the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * true if the entity was indexed successfully, false otherwise + */ def index[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -153,13 +489,33 @@ trait IndexApi { _: RefreshApi => ) } - @deprecated("Use index(index: String, id: String, source: String) instead", "0.7.29") + @deprecated("Use index(index: String, id: String, source: String) instead", "7.17.29") def index(index: String, indexType: String, id: String, source: String): Boolean = { this.index(index, id, source) } + /** Index an entity in the given index. + * @param index + * - the name of the index to index the entity in + * @param id + * - the id of the entity to index + * @param source + * - the source of the entity to index in JSON format + * @return + * true if the entity was indexed successfully, false otherwise + */ def index(index: String, id: String, source: String): Boolean + /** Index an entity in the given index asynchronously. + * @param entity + * - the entity to index + * @param index + * - the name of the index to index the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with true if the entity was indexed successfully, false otherwise + */ def indexAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -169,13 +525,23 @@ trait IndexApi { _: RefreshApi => indexAsync(index.getOrElse(indexType), entity.uuid, serialization.write[U](entity)) } - @deprecated("Use indexAsync(index: String, id: String, source: String) instead", "0.7.29") + @deprecated("Use indexAsync(index: String, id: String, source: String) instead", "7.17.29") def indexAsync(index: String, indexType: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = { this.indexAsync(index, id, source) } + /** Index an entity in the given index asynchronously. + * @param index + * - the name of the index to index the entity in + * @param id + * - the id of the entity to index + * @param source + * - the source of the entity to index in JSON format + * @return + * a Future that completes with true if the entity was indexed successfully, false otherwise + */ def indexAsync(index: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = { @@ -186,6 +552,19 @@ trait IndexApi { _: RefreshApi => } trait UpdateApi { _: RefreshApi => + + /** Update an entity in the given index. + * @param entity + * - the entity to update + * @param index + * - the name of the index to update the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * true if the entity was updated successfully, false otherwise + */ def update[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -203,7 +582,7 @@ trait UpdateApi { _: RefreshApi => @deprecated( "Use update(index: String, id: String, source: String, upsert: Boolean) instead", - "0.7.29" + "7.17.29" ) def update( index: String, @@ -215,8 +594,32 @@ trait UpdateApi { _: RefreshApi => this.update(index, id, source, upsert) } + /** Update an entity in the given index. + * @param index + * - the name of the index to update the entity in + * @param id + * - the id of the entity to update + * @param source + * - the source of the entity to update in JSON format + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * true if the entity was updated successfully, false otherwise + */ def update(index: String, id: String, source: String, upsert: Boolean): Boolean + /** Update an entity in the given index asynchronously. + * @param entity + * - the entity to update + * @param index + * - the name of the index to update the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * a Future that completes with true if the entity was updated successfully, false otherwise + */ def updateAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -235,12 +638,24 @@ trait UpdateApi { _: RefreshApi => @deprecated( "Use updateAsync(index: String, id: String, source: String, upsert: Boolean) instead", - "0.7.29" + "7.17.29" ) def updateAsync(index: String, indexType: String, id: String, source: String, upsert: Boolean)( implicit ec: ExecutionContext ): Future[Boolean] = this.updateAsync(index, id, source, upsert) + /** Update an entity in the given index asynchronously. + * @param index + * - the name of the index to update the entity in + * @param id + * - the id of the entity to update + * @param source + * - the source of the entity to update in JSON format + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * a Future that completes with true if the entity was updated successfully, false otherwise + */ def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit ec: ExecutionContext ): Future[Boolean] = { @@ -251,6 +666,17 @@ trait UpdateApi { _: RefreshApi => } trait DeleteApi { _: RefreshApi => + + /** Delete an entity from the given index. + * @param entity + * - the entity to delete + * @param index + * - the name of the index to delete the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * true if the entity was deleted successfully, false otherwise + */ def delete[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -260,13 +686,31 @@ trait DeleteApi { _: RefreshApi => delete(entity.uuid, index.getOrElse(indexType)) } - @deprecated("Use delete(uuid: String, index: String) instead", "0.7.29") + @deprecated("Use delete(uuid: String, index: String) instead", "7.17.29") def delete(uuid: String, index: String, indexType: String): Boolean = { this.delete(uuid, index) } + /** Delete an entity from the given index. + * @param uuid + * - the id of the entity to delete + * @param index + * - the name of the index to delete the entity from + * @return + * true if the entity was deleted successfully, false otherwise + */ def delete(uuid: String, index: String): Boolean + /** Delete an entity from the given index asynchronously. + * @param entity + * - the entity to delete + * @param index + * - the name of the index to delete the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with true if the entity was deleted successfully, false otherwise + */ def deleteAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -276,13 +720,21 @@ trait DeleteApi { _: RefreshApi => deleteAsync(entity.uuid, index.getOrElse(indexType)) } - @deprecated("Use deleteAsync(uuid: String, index: String) instead", "0.7.29") + @deprecated("Use deleteAsync(uuid: String, index: String) instead", "7.17.29") def deleteAsync(uuid: String, index: String, indexType: String)(implicit ec: ExecutionContext ): Future[Boolean] = { this.deleteAsync(uuid, index) } + /** Delete an entity from the given index asynchronously. + * @param uuid + * - the id of the entity to delete + * @param index + * - the name of the index to delete the entity from + * @return + * a Future that completes with true if the entity was deleted successfully, false otherwise + */ def deleteAsync(uuid: String, index: String)(implicit ec: ExecutionContext ): Future[Boolean] = { @@ -419,7 +871,7 @@ trait BulkApi { _: RefreshApi with SettingsApi => val settings = b.add(BulkSettings[A](bulkOptions.disableRefresh)(this, toBulkElasticAction)) val group = b.add(Flow[A].named("group").grouped(bulkOptions.maxBulkSize).map { items => -// logger.info(s"Preparing to write batch of ${items.size}...") + // logger.info(s"Preparing to write batch of ${items.size}...") items }) @@ -512,17 +964,37 @@ trait BulkApi { _: RefreshApi with SettingsApi => } trait CountApi { + + /** Count the number of documents matching the given JSON query asynchronously. + * @param query + * - the query to count the documents for + * @return + * the number of documents matching the query, or None if the count could not be determined + */ def countAsync(query: JSONQuery)(implicit ec: ExecutionContext): Future[Option[Double]] = { Future { this.count(query) } } + /** Count the number of documents matching the given JSON query. + * @param query + * - the query to count the documents for + * @return + * the number of documents matching the query, or None if the count could not be determined + */ def count(query: JSONQuery): Option[Double] } trait AggregateApi[T <: AggregateResult] { + + /** Aggregate the results of the given SQL query. + * @param sqlQuery + * - the query to aggregate the results for + * @return + * a sequence of aggregated results + */ def aggregate(sqlQuery: SQLQuery)(implicit ec: ExecutionContext ): Future[_root_.scala.collection.Seq[T]] @@ -531,12 +1003,33 @@ trait AggregateApi[T <: AggregateResult] { trait SingleValueAggregateApi extends AggregateApi[SingleValueAggregateResult] trait GetApi { + + /** Get an entity by its id from the given index. + * @param id + * - the id of the entity to get + * @param index + * - the name of the index to get the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * an Option containing the entity if it was found, None otherwise + */ def get[U <: Timestamped]( id: String, index: Option[String] = None, maybeType: Option[String] = None )(implicit m: Manifest[U], formats: Formats): Option[U] + /** Get an entity by its id from the given index asynchronously. + * @param id + * - the id of the entity to get + * @param index + * - the name of the index to get the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with an Option containing the entity if it was found, None otherwise + */ def getAsync[U <: Timestamped]( id: String, index: Option[String] = None, @@ -550,13 +1043,33 @@ trait GetApi { trait SearchApi { + /** Search for entities matching the given JSON query. + * @param jsonQuery + * - the query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of entities matching the query + */ def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U] + /** Search for entities matching the given SQL query. + * @param sqlQuery + * - the SQL query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of entities matching the query + */ def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { sqlQuery.search match { case Some(searchRequest) => val indices = collection.immutable.Seq(searchRequest.sources: _*) - search[U](JSONQuery(searchRequest.query, indices))(m, formats) + search[U](JSONQuery(searchRequest.query, indices)) case None => throw new IllegalArgumentException( s"SQL query ${sqlQuery.query} does not contain a valid search request" @@ -564,12 +1077,36 @@ trait SearchApi { } } + /** Search for entities matching the given SQL query asynchronously. + * @param sqlQuery + * - the SQL query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a Future that completes with a list of entities matching the query + */ def searchAsync[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = Future( this.search[U](sqlQuery) ) + /** Search for entities matching the given JSON query with inner hits. + * @param sqlQuery + * - the SQL query to search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of tuples containing the main entity and a list of inner hits + */ def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], @@ -587,12 +1124,36 @@ trait SearchApi { } } + /** Search for entities matching the given JSON query with inner hits. + * @param sqlQuery + * - the SQL query to search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of tuples containing the main entity and a list of inner hits + */ def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], formats: Formats ): List[(U, List[I])] + /** Perform a multi-search operation with the given SQL query. + * @param sqlQuery + * - the SQL query to perform the multi-search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of entities matching the queries in the multi-search request + */ def multiSearch[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { @@ -613,10 +1174,34 @@ trait SearchApi { } } + /** Perform a multi-search operation with the given JSON queries. + * @param jsonQueries + * - the JSON queries to perform the multi-search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of entities matching the queries in the multi-search request + */ def multiSearch[U]( jsonQueries: JSONQueries )(implicit m: Manifest[U], formats: Formats): List[List[U]] + /** Perform a multi-search operation with the given SQL query and inner hits. + * @param sqlQuery + * - the SQL query to perform the multi-search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of tuples containing the main entity and a list of inner hits + */ def multiSearchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], @@ -639,6 +1224,20 @@ trait SearchApi { } } + /** Perform a multi-search operation with the given JSON queries and inner hits. + * @param jsonQueries + * - the JSON queries to perform the multi-search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of tuples containing the main entity and a list of inner hits + */ def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], diff --git a/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala b/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala new file mode 100644 index 00000000..4495ef34 --- /dev/null +++ b/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala @@ -0,0 +1,87 @@ +package app.softnetwork.elastic.client + +import com.google.gson._ +import com.typesafe.scalalogging.StrictLogging + +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try} + +object MappingComparator extends StrictLogging { + + private def parseJsonToMap(jsonString: String): Map[String, JsonElement] = { + Try( + new JsonParser() + .parse(jsonString) + .getAsJsonObject + .get("properties") + .getAsJsonObject + .entrySet() + .asScala + .map { entry => + // Convert the mapping properties to a Map[String, JsonElement] + entry.getKey -> entry.getValue + } + .toMap + ) match { + case Success(jsonMap) => jsonMap + case Failure(f) => + logger.error(s"Failed to parse JSON mapping $jsonString: ${f.getMessage}", f) + Map.empty[String, JsonElement] + } + } + + private def compareJsonMappings( + oldMap: Map[String, JsonElement], + newMap: Map[String, JsonElement] + ): Map[String, (Option[JsonElement], Option[JsonElement])] = { + val allKeys = oldMap.keySet union newMap.keySet + allKeys.flatMap { key => + (oldMap.get(key), newMap.get(key)) match { + case (Some(oldVal), Some(newVal)) if oldVal == newVal => + None // equal + case (a, b) => + Some(key -> (a, b)) // mismatch or missing + } + }.toMap + } + + private def hasDifferences( + diff: Map[String, (Option[JsonElement], Option[JsonElement])] + ): Boolean = + diff.nonEmpty + + private def formatDiff(diff: Map[String, (Option[JsonElement], Option[JsonElement])]): String = { + diff + .map { + case (key, (Some(oldV), Some(newV))) => + s"Field [$key] changed:\n old: $oldV\n new: $newV" + case (key, (Some(oldV), None)) => + s"Field [$key] removed:\n old: $oldV" + case (key, (None, Some(newV))) => + s"Field [$key] added:\n new: $newV" + case _ => "" + } + .mkString("\n") + } + + /** Compares two JSON mappings and logs the differences. + * + * @param oldMapping + * - the old mapping as a JSON string + * @param newMapping + * - the new mapping as a JSON string + * @return + * true if there are differences, false otherwise + */ + def isMappingDifferent(oldMapping: String, newMapping: String): Boolean = { + val oldMap = parseJsonToMap(oldMapping) + val newMap = parseJsonToMap(newMapping) + val diff = compareJsonMappings(oldMap, newMap) + if (hasDifferences(diff)) { + logger.info(formatDiff(diff)) + true + } else { + false + } + } +} diff --git a/client/src/main/scala/app/softnetwork/elastic/client/package.scala b/client/src/main/scala/app/softnetwork/elastic/client/package.scala index 7782357d..30753177 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/package.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/package.scala @@ -6,7 +6,7 @@ import app.softnetwork.elastic.client.BulkAction.BulkAction import app.softnetwork.serialization._ import com.google.gson.{Gson, JsonElement, JsonObject} import com.typesafe.config.{Config, ConfigFactory} -import com.typesafe.scalalogging.StrictLogging +import com.typesafe.scalalogging.{Logger, StrictLogging} import configs.ConfigReader import org.json4s.Formats @@ -117,7 +117,7 @@ package object client { def docAsUpsert(doc: String): String = s"""{"doc":$doc,"doc_as_upsert":true}""" implicit class InnerHits(searchResult: JsonObject) { - import scala.collection.JavaConverters._ + import scala.jdk.CollectionConverters._ def ~>[M, I]( innerField: String )(implicit formats: Formats, m: Manifest[M], i: Manifest[I]): List[(M, List[I])] = { @@ -166,4 +166,14 @@ package object client { case class JSONQuery(query: String, indices: Seq[String], types: Seq[String] = Seq.empty) case class JSONQueries(queries: List[JSONQuery]) + + def tryOrElse[T](block: => T, default: => T)(implicit logger: Logger): T = { + try { + block + } catch { + case e: Exception => + logger.error("An error occurred while executing the block", e) + default + } + } } diff --git a/java/persistence/build.sbt b/java/persistence/build.sbt new file mode 100644 index 00000000..07392682 --- /dev/null +++ b/java/persistence/build.sbt @@ -0,0 +1,3 @@ +organization := "app.softnetwork.elastic" + +name := "elastic-java-persistence" diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala b/java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticsearchClientProvider.scala similarity index 68% rename from java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala rename to java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticsearchClientProvider.scala index 2b25e2d4..d83d9408 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala +++ b/java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticsearchClientProvider.scala @@ -1,6 +1,6 @@ -package app.softnetwork.elastic.client.java +package app.softnetwork.elastic.persistence.query -import app.softnetwork.elastic.persistence.query.ElasticProvider +import app.softnetwork.elastic.client.java.ElasticsearchClientApi import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.model.Timestamped diff --git a/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala b/java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala similarity index 86% rename from java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala rename to java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala index c9cd532c..eaf53713 100644 --- a/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala +++ b/java/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala @@ -1,6 +1,5 @@ package app.softnetwork.elastic.persistence.query -import app.softnetwork.elastic.client.java.ElasticsearchClientProvider import app.softnetwork.persistence.message.CrudEvent import app.softnetwork.persistence.model.Timestamped import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala index 3a3785b6..729e7fc6 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala @@ -18,12 +18,14 @@ import co.elastic.clients.elasticsearch.core.bulk.{ } import co.elastic.clients.elasticsearch.core.msearch.{MultisearchHeader, RequestItem} import co.elastic.clients.elasticsearch.core._ +import co.elastic.clients.elasticsearch.core.reindex.{Destination, Source} import co.elastic.clients.elasticsearch.core.search.SearchRequestBody import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction} -import co.elastic.clients.elasticsearch.indices._ +import co.elastic.clients.elasticsearch.indices.{ExistsRequest => IndexExistsRequest, _} +import co.elastic.clients.json.jackson.JacksonJsonpMapper import com.google.gson.{Gson, JsonParser} -import _root_.java.io.StringReader +import _root_.java.io.{StringReader, StringWriter} import _root_.java.util.{Map => JMap} import scala.jdk.CollectionConverters._ import org.json4s.Formats @@ -51,60 +53,116 @@ trait ElasticsearchClientApi trait ElasticsearchClientIndicesApi extends IndicesApi with ElasticsearchClientCompanion { override def createIndex(index: String, settings: String): Boolean = { - apply() - .indices() - .create( - new CreateIndexRequest.Builder() - .index(index) - .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) - .build() - ) - .acknowledged() + tryOrElse( + apply() + .indices() + .create( + new CreateIndexRequest.Builder() + .index(index) + .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) + .build() + ) + .acknowledged(), + false + )(logger) } override def deleteIndex(index: String): Boolean = { - apply().indices().delete(new DeleteIndexRequest.Builder().index(index).build()).acknowledged() + tryOrElse( + apply() + .indices() + .delete(new DeleteIndexRequest.Builder().index(index).build()) + .acknowledged(), + false + )(logger) } override def openIndex(index: String): Boolean = { - apply().indices().open(new OpenRequest.Builder().index(index).build()).acknowledged() + tryOrElse( + apply().indices().open(new OpenRequest.Builder().index(index).build()).acknowledged(), + false + )(logger) } override def closeIndex(index: String): Boolean = { - apply().indices().close(new CloseIndexRequest.Builder().index(index).build()).acknowledged() + tryOrElse( + apply().indices().close(new CloseIndexRequest.Builder().index(index).build()).acknowledged(), + false + )(logger) + } + + override def reindex( + sourceIndex: String, + targetIndex: String, + refresh: Boolean = true + ): Boolean = { + val failures = apply() + .reindex( + new ReindexRequest.Builder() + .source(new Source.Builder().index(sourceIndex).build()) + .dest(new Destination.Builder().index(targetIndex).build()) + .refresh(refresh) + .build() + ) + .failures() + .asScala + .map(_.cause().reason()) + if (failures.nonEmpty) { + logger.error( + s"Reindexing from $sourceIndex to $targetIndex failed with errors: ${failures.take(100).mkString(", ")}" + ) + } + failures.isEmpty } + override def indexExists(index: String): Boolean = { + tryOrElse( + apply() + .indices() + .exists( + new IndexExistsRequest.Builder().index(index).build() + ) + .value(), + false + )(logger) + } } trait ElasticsearchClientAliasApi extends AliasApi with ElasticsearchClientCompanion { override def addAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new UpdateAliasesRequest.Builder() - .actions( - new Action.Builder() - .add(new AddAction.Builder().index(index).alias(alias).build()) - .build() - ) - .build() - ) - .acknowledged() + tryOrElse( + apply() + .indices() + .updateAliases( + new UpdateAliasesRequest.Builder() + .actions( + new Action.Builder() + .add(new AddAction.Builder().index(index).alias(alias).build()) + .build() + ) + .build() + ) + .acknowledged(), + false + )(logger) } override def removeAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new UpdateAliasesRequest.Builder() - .actions( - new Action.Builder() - .remove(new RemoveAction.Builder().index(index).alias(alias).build()) - .build() - ) - .build() - ) - .acknowledged() + tryOrElse( + apply() + .indices() + .updateAliases( + new UpdateAliasesRequest.Builder() + .actions( + new Action.Builder() + .remove(new RemoveAction.Builder().index(index).alias(alias).build()) + .build() + ) + .build() + ) + .acknowledged(), + false + )(logger) } } @@ -112,83 +170,128 @@ trait ElasticsearchClientSettingsApi extends SettingsApi with ElasticsearchClien _: ElasticsearchClientIndicesApi => override def updateSettings(index: String, settings: String): Boolean = { - apply() - .indices() - .putSettings( - new PutIndicesSettingsRequest.Builder() - .index(index) - .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) - .build() - ) - .acknowledged() + tryOrElse( + apply() + .indices() + .putSettings( + new PutIndicesSettingsRequest.Builder() + .index(index) + .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) + .build() + ) + .acknowledged(), + false + )(logger) } - override def loadSettings(): String = { - val settings = apply() - .indices() - .getSettings( - new GetIndicesSettingsRequest.Builder().index("*").build() - ) - extractSource(settings).getOrElse("") + override def loadSettings(index: String): String = { + tryOrElse( + Option( + apply() + .indices() + .getSettings( + new GetIndicesSettingsRequest.Builder().index(index).build() + ) + .get(index) + ).map { value => + val mapper = new JacksonJsonpMapper() + val writer = new StringWriter() + val generator = mapper.jsonProvider().createGenerator(writer) + mapper.serialize(value.settings().index(), generator) + generator.close() + writer.toString + }, + None + )(logger).getOrElse("{}") } } -trait ElasticsearchClientMappingApi extends MappingApi with ElasticsearchClientCompanion { +trait ElasticsearchClientMappingApi + extends MappingApi + with ElasticsearchClientIndicesApi + with ElasticsearchClientRefreshApi + with ElasticsearchClientCompanion { override def setMapping(index: String, mapping: String): Boolean = { - apply() - .indices() - .putMapping( - new PutMappingRequest.Builder().index(index).withJson(new StringReader(mapping)).build() - ) - .acknowledged() + tryOrElse( + apply() + .indices() + .putMapping( + new PutMappingRequest.Builder().index(index).withJson(new StringReader(mapping)).build() + ) + .acknowledged(), + false + )(logger) } override def getMapping(index: String): String = { - val mapping = apply() - .indices() - .getMapping( - new GetMappingRequest.Builder().index(index).build() - ) - extractSource(mapping).getOrElse("") + tryOrElse( + { + Option( + apply() + .indices() + .getMapping( + new GetMappingRequest.Builder().index(index).build() + ) + .get(index) + ).map { value => + val mapper = new JacksonJsonpMapper() + val writer = new StringWriter() + val generator = mapper.jsonProvider().createGenerator(writer) + mapper.serialize(value, generator) + generator.close() + writer.toString + } + }, + None + )(logger).getOrElse(s""""{$index: {"mappings": {}}}""") } } trait ElasticsearchClientRefreshApi extends RefreshApi with ElasticsearchClientCompanion { override def refresh(index: String): Boolean = { - apply() - .indices() - .refresh( - new RefreshRequest.Builder().index(index).build() - ) - .shards() - .failed() - .intValue() == 0 + tryOrElse( + apply() + .indices() + .refresh( + new RefreshRequest.Builder().index(index).build() + ) + .shards() + .failed() + .intValue() == 0, + false + )(logger) } } trait ElasticsearchClientFlushApi extends FlushApi with ElasticsearchClientCompanion { override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = { - apply() - .indices() - .flush( - new FlushRequest.Builder().index(index).force(force).waitIfOngoing(wait).build() - ) - .shards() - .failed() - .intValue() == 0 + tryOrElse( + apply() + .indices() + .flush( + new FlushRequest.Builder().index(index).force(force).waitIfOngoing(wait).build() + ) + .shards() + .failed() + .intValue() == 0, + false + )(logger) } } trait ElasticsearchClientCountApi extends CountApi with ElasticsearchClientCompanion { override def count(query: client.JSONQuery): Option[Double] = { - Option( - apply() - .count( - new CountRequest.Builder().index(query.indices.asJava).build() - ) - .count() - .toDouble - ) + tryOrElse( + Option( + apply() + .count( + new CountRequest.Builder().index(query.indices.asJava).build() + ) + .count() + .toDouble + ), + None + )(logger) } override def countAsync(query: client.JSONQuery)(implicit @@ -336,17 +439,20 @@ trait ElasticsearchClientSingleValueAggregateApi trait ElasticsearchClientIndexApi extends IndexApi with ElasticsearchClientCompanion { _: ElasticsearchClientRefreshApi => override def index(index: String, id: String, source: String): Boolean = { - apply() - .index( - new IndexRequest.Builder() - .index(index) - .id(id) - .withJson(new StringReader(source)) - .build() - ) - .shards() - .failed() - .intValue() == 0 + tryOrElse( + apply() + .index( + new IndexRequest.Builder() + .index(index) + .id(id) + .withJson(new StringReader(source)) + .build() + ) + .shards() + .failed() + .intValue() == 0, + false + )(logger) } override def indexAsync(index: String, id: String, source: String)(implicit @@ -379,19 +485,22 @@ trait ElasticsearchClientUpdateApi extends UpdateApi with ElasticsearchClientCom source: String, upsert: Boolean ): Boolean = { - apply() - .update( - new UpdateRequest.Builder[JMap[String, Object], JMap[String, Object]]() - .index(index) - .id(id) - .doc(mapper.readValue(source, classOf[JMap[String, Object]])) - .docAsUpsert(upsert) - .build(), - classOf[JMap[String, Object]] - ) - .shards() - .failed() - .intValue() == 0 + tryOrElse( + apply() + .update( + new UpdateRequest.Builder[JMap[String, Object], JMap[String, Object]]() + .index(index) + .id(id) + .doc(mapper.readValue(source, classOf[JMap[String, Object]])) + .docAsUpsert(upsert) + .build(), + classOf[JMap[String, Object]] + ) + .shards() + .failed() + .intValue() == 0, + false + )(logger) } override def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit @@ -422,13 +531,16 @@ trait ElasticsearchClientDeleteApi extends DeleteApi with ElasticsearchClientCom _: ElasticsearchClientRefreshApi => override def delete(uuid: String, index: String): Boolean = { - apply() - .delete( - new DeleteRequest.Builder().index(index).id(uuid).build() - ) - .shards() - .failed() - .intValue() == 0 + tryOrElse( + apply() + .delete( + new DeleteRequest.Builder().index(index).id(uuid).build() + ) + .shards() + .failed() + .intValue() == 0, + false + )(logger) } override def deleteAsync(uuid: String, index: String)(implicit @@ -677,19 +789,19 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom .map(_.hits().hits().asScala.toList) .getOrElse(Nil) val innerObjects = innerHits.flatMap { innerHit => - extractSource(innerHit) match { - case Some(innerSource) => - logger.whenDebugEnabled(s"Processing inner hit: $innerSource") - val json = new JsonParser().parse(innerSource).getAsJsonObject - val gson = new Gson() - Try(serialization.read[I](gson.toJson(json.get("_source")))) match { - case Success(innerObject) => Some(innerObject) - case Failure(f) => - logger.error(s"Failed to deserialize inner hit: $innerSource", f) - None - } - case None => - logger.warn("Could not extract inner hit source from string representation") + val mapper = new JacksonJsonpMapper() + val writer = new StringWriter() + val generator = mapper.jsonProvider().createGenerator(writer) + mapper.serialize(innerHit, generator) + generator.close() + val innerSource = writer.toString + logger.whenDebugEnabled(s"Processing inner hit: $innerSource") + val json = new JsonParser().parse(innerSource).getAsJsonObject + val gson = new Gson() + Try(serialization.read[I](gson.toJson(json.get("_source")))) match { + case Success(innerObject) => Some(innerObject) + case Failure(f) => + logger.error(s"Failed to deserialize inner hit: $innerSource", f) None } } diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala index 080bb3d0..956ff0fa 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala @@ -76,11 +76,4 @@ trait ElasticsearchClientCompanion extends StrictLogging { promise.future } - protected def extractSource(value: AnyRef): Option[String] = { - val s = value.toString - val idx = s.indexOf(':') - if (idx >= 0 && idx + 1 < s.length) Some(s.substring(idx + 1).trim) - else None - } - } diff --git a/testkit/build.sbt b/testkit/build.sbt index a649ad56..a428b25e 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -20,4 +20,30 @@ val elastic = Seq( ) libraryDependencies ++= elastic :+ - "app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence \ No newline at end of file + "app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence + +val testJavaOptions = { + val heapSize = sys.env.getOrElse("HEAP_SIZE", "1g") + val extraTestJavaArgs = Seq("-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED").mkString(" ") + s"-Xmx$heapSize -Xss4m -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" + .split(" ").toSeq +} + +Test / javaOptions ++= testJavaOptions + +// Required by the Test container framework +Test / fork := true diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala index 156aa550..cdb90e30 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala @@ -20,14 +20,25 @@ trait MockElasticClientApi extends ElasticClientApi { protected val elasticDocuments: ElasticDocuments = new ElasticDocuments() {} - override def toggleRefresh(index: String, enable: Boolean): Unit = {} + override def toggleRefresh(index: String, enable: Boolean): Boolean = true - override def setReplicas(index: String, replicas: Int): Unit = {} + override def setReplicas(index: String, replicas: Int): Boolean = true override def updateSettings(index: String, settings: String) = true override def addAlias(index: String, alias: String): Boolean = true + /** Remove an alias from the given index. + * + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was removed successfully, false otherwise + */ + override def removeAlias(index: String, alias: String): Boolean = true + override def createIndex(index: String, settings: String): Boolean = true override def setMapping(index: String, mapping: String): Boolean = true @@ -38,6 +49,29 @@ trait MockElasticClientApi extends ElasticClientApi { override def openIndex(index: String): Boolean = true + /** Reindex from source index to target index. + * + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + override def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean = true): Boolean = + true + + /** Check if an index exists. + * + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + override def indexExists(index: String): Boolean = false + override def count(jsonQuery: JSONQuery): Option[Double] = throw new UnsupportedOperationException @@ -137,7 +171,8 @@ trait MockElasticClientApi extends ElasticClientApi { ): Future[Seq[SingleValueAggregateResult]] = throw new UnsupportedOperationException - override def loadSettings(): String = throw new UnsupportedOperationException + override def loadSettings(index: String): String = + throw new UnsupportedOperationException } trait ElasticDocuments { diff --git a/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala b/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala new file mode 100644 index 00000000..cc0f00ed --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala @@ -0,0 +1,15 @@ +package app.softnetwork.persistence.person + +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit +import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit + +trait ElasticPersonTestKit + extends PersonTestKit + with InMemoryPersistenceTestKit + with ElasticDockerTestKit { + + override def beforeAll(): Unit = { + super.beforeAll() + initAndJoinCluster() + } +} diff --git a/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala b/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala new file mode 100644 index 00000000..d3b6c3f9 --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala @@ -0,0 +1,14 @@ +package app.softnetwork.persistence.query + +import app.softnetwork.elastic.client.ElasticClientApi +import app.softnetwork.elastic.persistence.query.{ElasticProvider, State2ElasticProcessorStream} +import app.softnetwork.persistence.person.message.PersonEvent +import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.person.query.PersonToExternalProcessorStream + +trait PersonToElasticProcessorStream + extends State2ElasticProcessorStream[Person, PersonEvent] + with PersonToExternalProcessorStream + with InMemoryJournalProvider + with InMemoryOffsetProvider + with ElasticProvider[Person] { _: ElasticClientApi => } diff --git a/testkit/src/test/resources/mapping/person.mustache b/testkit/src/test/resources/mapping/person.mustache new file mode 100644 index 00000000..21829e1c --- /dev/null +++ b/testkit/src/test/resources/mapping/person.mustache @@ -0,0 +1,21 @@ +{ + "properties": { + "uuid": { + "type": "keyword", + "index": true + }, + "name": { + "type": "text", + "analyzer": "search_analyzer" + }, + "birthDate": { + "type": "keyword" + }, + "createdDate": { + "type": "date" + }, + "lastUpdated": { + "type": "date" + } + } +} \ No newline at end of file diff --git a/testkit/src/test/resources/mapping/sample.mustache b/testkit/src/test/resources/mapping/sample.mustache deleted file mode 100644 index f3e19d45..00000000 --- a/testkit/src/test/resources/mapping/sample.mustache +++ /dev/null @@ -1,16 +0,0 @@ -{ - "{{type}}": { - "properties": { - "uuid": { - "type": "keyword", - "index": true - }, - "createdDate": { - "type": "date" - }, - "lastUpdated": { - "type": "date" - } - } - } -} \ No newline at end of file diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index 9b359dfb..02c0083b 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -63,10 +63,24 @@ trait ElasticClientSpec super.afterAll() } + val persons: List[String] = List( + """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "childrenCount": 0} """ + ) + + private val personsWithUpsert = + persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ + + val children: List[String] = List( + """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"} """, + """ { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09"} """ + ) + "Creating an index and then delete it" should "work fine" in { pClient.createIndex("create_delete") blockUntilIndexExists("create_delete") - "create_delete" should beCreated + "create_delete" should beCreated() pClient.deleteIndex("create_delete") blockUntilIndexNotExists("create_delete") @@ -97,13 +111,7 @@ trait ElasticClientSpec "Toggle refresh" should "work" in { pClient.toggleRefresh("person", enable = false) new JsonParser() - .parse(pClient.loadSettings()) - .getAsJsonObject - .get("person") - .getAsJsonObject - .get("settings") - .getAsJsonObject - .get("index") + .parse(pClient.loadSettings("person")) .getAsJsonObject .get("refresh_interval") .getAsString shouldBe "-1" @@ -112,13 +120,7 @@ trait ElasticClientSpec pClient.toggleRefresh("person", enable = true) // settings.getOrElse("index.refresh_interval", "") shouldBe "1s" new JsonParser() - .parse(pClient.loadSettings()) - .getAsJsonObject - .get("person") - .getAsJsonObject - .get("settings") - .getAsJsonObject - .get("index") + .parse(pClient.loadSettings("person")) .getAsJsonObject .get("refresh_interval") .getAsString shouldBe "1s" @@ -141,19 +143,172 @@ trait ElasticClientSpec settings.getOrElse("index.number_of_replicas", "") shouldBe "0" } - val persons: List[String] = List( - """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0} """, - """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0} """, - """ { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "childrenCount": 0} """ - ) + "Setting a mapping" should "work" in { + pClient.createIndex("person_mapping") + blockUntilIndexExists("person_mapping") + "person_mapping" should beCreated() - private val personsWithUpsert = - persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ + val mapping = + """{ + | "properties": { + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "name": { + | "type": "text", + | "analyzer": "ngram_analyzer", + | "search_analyzer": "search_analyzer", + | "fields": { + | "raw": { + | "type": "keyword" + | }, + | "fr": { + | "type": "text", + | "analyzer": "french" + | } + | } + | } + | } + |}""".stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.setMapping("person_mapping", mapping) shouldBe true - val children: List[String] = List( - """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"} """, - """ { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09"} """ - ) + val properties = pClient.getMappingProperties("person_mapping") + logger.info(s"properties: $properties") + MappingComparator.isMappingDifferent( + properties, + mapping + ) shouldBe false + + implicit val bulkOptions: BulkOptions = BulkOptions("person_mapping", "person", 1000) + val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) + refresh(indices) + pClient.flush("person_mapping") + + indices should contain only "person_mapping" + + blockUntilCount(3, "person_mapping") + + "person_mapping" should haveCount(3) + + pClient.search[Person]("select * from person_mapping") match { + case r if r.size == 3 => + r.map(_.uuid) should contain allOf ("A12", "A14", "A16") + case other => fail(other.toString) + } + + pClient.search[Person]("select * from person_mapping where uuid = 'A16'") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + pClient.search[Person]("select * from person_mapping where match(name, 'gum')") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + pClient.search[Person]( + "select * from person_mapping where uuid <> 'A16' and match(name, 'gum')" + ) match { + case r if r.isEmpty => + case other => fail(other.toString) + } + } + + "Updating a mapping" should "work" in { + val mapping = + """{ + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "childrenCount": { + | "type": "integer" + | } + | } + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.updateMapping("person_migration", mapping) shouldBe true + blockUntilIndexExists("person_migration") + "person_migration" should beCreated() + + implicit val bulkOptions: BulkOptions = BulkOptions("person_migration", "person", 1000) + val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) + refresh(indices) + pClient.flush("person_migration") + + indices should contain only "person_migration" + + blockUntilCount(3, "person_migration") + + "person_migration" should haveCount(3) + + pClient.search[Person]("select * from person_migration where name like '%gum%'") match { + case r if r.isEmpty => + case other => fail(other.toString) + } + + val newMapping = + """{ + | "properties": { + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "name": { + | "type": "text", + | "analyzer": "ngram_analyzer", + | "search_analyzer": "search_analyzer", + | "fields": { + | "raw": { + | "type": "keyword" + | }, + | "fr": { + | "type": "text", + | "analyzer": "french" + | } + | } + | }, + | "childrenCount": { + | "type": "integer" + | }, + | "children": { + | "type": "nested", + | "include_in_parent": true, + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "birthDate": { + | "type": "date" + | } + | } + | } + | } + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.shouldUpdateMapping("person_migration", newMapping) shouldBe true + pClient.updateMapping("person_migration", newMapping) shouldBe true + + pClient.search[Person]("select * from person_migration where name like '%gum%'") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + } "Bulk index valid json without id key and suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person1", "person", 2) @@ -382,7 +537,7 @@ trait ElasticClientSpec r2.size should ===(1) r2.map(_.uuid) should contain("A16") - pClient.searchAsync[Person](SQLQuery("select * from person7 where _id=\"A16\"")) onComplete { + pClient.searchAsync[Person]("select * from person7 where _id=\"A16\"") onComplete { case Success(r) => r.size should ===(1) r.map(_.uuid) should contain("A16") @@ -520,7 +675,7 @@ trait ElasticClientSpec bClient.setMapping("binaries", mapping) shouldBe true val mappings = bClient.getMapping("binaries") logger.info(s"mappings: $mappings") - assert("{\"binaries\":{\"mappings\":" + mapping + "}}" == mappings) + assert("{\"mappings\":" + mapping + "}" == mappings) for (uuid <- Seq("png", "jpg", "pdf")) { val path = Paths.get(Thread.currentThread().getContextClassLoader.getResource(s"avatar.$uuid").getPath) @@ -613,43 +768,27 @@ trait ElasticClientSpec } // test count aggregation - pClient - .aggregate( - "select count(p.uuid) as c from person10 p" - ) - .complete() match { + pClient.aggregate("select count(p.uuid) as c from person10 p").complete() match { case Success(s) => s.headOption.flatMap(_.asDoubleOption).getOrElse(0d) should ===(3d) case Failure(f) => fail(f.getMessage) } // test max aggregation on date field - pClient - .aggregate( - "select max(p.birthDate) as c from person10 p" - ) - .complete() match { + pClient.aggregate("select max(p.birthDate) as c from person10 p").complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1969-05-09T00:00:00.000Z") case Failure(f) => fail(f.getMessage) } // test min aggregation on date field - pClient - .aggregate( - "select min(p.birthDate) as c from person10 p" - ) - .complete() match { + pClient.aggregate("select min(p.birthDate) as c from person10 p").complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1967-11-21T00:00:00.000Z") case Failure(f) => fail(f.getMessage) } // test avg aggregation on date field - pClient - .aggregate( - "select avg(p.birthDate) as c from person10 p" - ) - .complete() match { + pClient.aggregate("select avg(p.birthDate) as c from person10 p").complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1968-05-17T08:00:00.000Z") case Failure(f) => fail(f.getMessage) diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala index 261f286b..754a3416 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala @@ -1,7 +1,7 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.client.java.ElasticsearchClientProvider import app.softnetwork.elastic.model.{Binary, Parent, Sample} +import app.softnetwork.elastic.persistence.query.ElasticsearchClientProvider import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.person.model.Person import co.elastic.clients.elasticsearch.ElasticsearchClient diff --git a/testkit/src/test/scala/app/softnetwork/persistence/person/ElasticsearchClientPersonHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/persistence/person/ElasticsearchClientPersonHandlerSpec.scala new file mode 100644 index 00000000..60aab28f --- /dev/null +++ b/testkit/src/test/scala/app/softnetwork/persistence/person/ElasticsearchClientPersonHandlerSpec.scala @@ -0,0 +1,39 @@ +package app.softnetwork.persistence.person + +import akka.actor.typed.ActorSystem +import app.softnetwork.elastic.client.java.ElasticsearchClientApi +import app.softnetwork.elastic.persistence.query.ElasticProvider +import app.softnetwork.persistence.ManifestWrapper +import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.person.query.PersonToExternalProcessorStream +import app.softnetwork.persistence.query.{ + ExternalPersistenceProvider, + PersonToElasticProcessorStream +} +import com.typesafe.config.Config +import org.slf4j.{Logger, LoggerFactory} + +import scala.concurrent.ExecutionContextExecutor + +class ElasticsearchClientPersonHandlerSpec extends ElasticPersonTestKit { + + implicit val ec: ExecutionContextExecutor = typedSystem().executionContext + + override def externalPersistenceProvider: ExternalPersistenceProvider[Person] = + new ElasticProvider[Person] with ElasticsearchClientApi with ManifestWrapper[Person] { + override protected val manifestWrapper: ManifestW = ManifestW() + override lazy val config: Config = ElasticsearchClientPersonHandlerSpec.this.elasticConfig + } + + override def person2ExternalProcessorStream: ActorSystem[_] => PersonToExternalProcessorStream = + sys => + new PersonToElasticProcessorStream with ElasticsearchClientApi { + override val forTests: Boolean = true + override protected val manifestWrapper: ManifestW = ManifestW() + override implicit def system: ActorSystem[_] = sys + override def log: Logger = LoggerFactory getLogger getClass.getName + override lazy val config: Config = ElasticsearchClientPersonHandlerSpec.this.elasticConfig + } + + override def log: Logger = LoggerFactory getLogger getClass.getName +} From c02061aca0d8ec5889c8d7a3173c7999db7e3259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sat, 26 Jul 2025 17:40:53 +0300 Subject: [PATCH 2/2] update for codecov --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 0d7b3099..3408040e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -48,7 +48,7 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: - files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml + files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,java/target/scala-2.13/coverage-report/cobertura.xml,java/persistence/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml flags: unittests fail_ci_if_error: false verbose: true