From bec3964d4e7e6db0f790dfac656a847f56d15f18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Mon, 28 Jul 2025 12:00:06 +0300 Subject: [PATCH 1/2] add update mapping, move rest and jest client persistences to specific modules, add specifications for elastic persistence --- build.sbt | 26 +- .../elastic/client/ElasticClientApi.scala | 622 +++++++++++++++++- .../elastic/client/MappingComparator.scala | 87 +++ .../softnetwork/elastic/client/package.scala | 15 +- jest/persistence/build.sbt | 3 + .../persistence/query}/JestProvider.scala | 3 +- ...asticProcessorStreamWithJestProvider.scala | 5 +- .../elastic/client/jest/JestClientApi.scala | 311 +++++++-- .../persistence/query/ElasticProvider.scala | 4 +- rest/persistence/build.sbt | 3 + .../query}/RestHighLevelClientProvider.scala | 3 +- ...asticProcessorStreamWithRestProvider.scala | 5 +- .../client/rest/RestHighLevelClientApi.scala | 363 ++++++---- .../elastic/client/MockElasticClientApi.scala | 131 ++-- .../person/ElasticPersonTestKit.scala | 15 + .../PersonToElasticProcessorStream.scala | 14 + .../test/resources/mapping/person.mustache | 21 + .../test/resources/mapping/sample.mustache | 16 - .../elastic/client/ElasticClientSpec.scala | 275 ++++++-- .../elastic/client/JestProviders.scala | 2 +- .../client/RestHighLevelProviders.scala | 2 +- .../person/JestClientPersonHandlerSpec.scala | 35 + ...RestHighLevelClientPersonHandlerSpec.scala | 35 + 23 files changed, 1619 insertions(+), 377 deletions(-) create mode 100644 client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala create mode 100644 jest/persistence/build.sbt rename jest/{src/main/scala/app/softnetwork/elastic/client/jest => persistence/src/main/scala/app/softnetwork/persistence/query}/JestProvider.scala (76%) rename jest/{src/main/scala/app/softnetwork/elastic => persistence/src/main/scala/app/softnetwork}/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala (63%) create mode 100644 rest/persistence/build.sbt rename rest/{src/main/scala/app/softnetwork/elastic/client/rest => persistence/src/main/scala/app/softnetwork/persistence/query}/RestHighLevelClientProvider.scala (74%) rename rest/{src/main/scala/app/softnetwork/elastic => persistence/src/main/scala/app/softnetwork}/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala (62%) create mode 100644 testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala create mode 100644 testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala create mode 100644 testkit/src/test/resources/mapping/person.mustache delete mode 100644 testkit/src/test/resources/mapping/sample.mustache create mode 100644 testkit/src/test/scala/app/softnetwork/persistence/person/JestClientPersonHandlerSpec.scala create mode 100644 testkit/src/test/scala/app/softnetwork/persistence/person/RestHighLevelClientPersonHandlerSpec.scala diff --git a/build.sbt b/build.sbt index 949f96f7..3e66fee0 100644 --- a/build.sbt +++ b/build.sbt @@ -69,6 +69,16 @@ lazy val persistence = project.in(file("persistence")) lazy val rest = project.in(file("rest")) .configs(IntegrationTest) .settings(Defaults.itSettings) + .dependsOn( + client % "compile->compile;test->test;it->it" + ) + +lazy val restPersistence = project.in(file("rest/persistence")) + .configs(IntegrationTest) + .settings(Defaults.itSettings) + .dependsOn( + rest % "compile->compile;test->test;it->it", + ) .dependsOn( persistence % "compile->compile;test->test;it->it" ) @@ -76,6 +86,16 @@ lazy val rest = project.in(file("rest")) lazy val jest = project.in(file("jest")) .configs(IntegrationTest) .settings(Defaults.itSettings) + .dependsOn( + client % "compile->compile;test->test;it->it" + ) + +lazy val jestPersistence = project.in(file("jest/persistence")) + .configs(IntegrationTest) + .settings(Defaults.itSettings) + .dependsOn( + jest % "compile->compile;test->test;it->it", + ) .dependsOn( persistence % "compile->compile;test->test;it->it" ) @@ -88,13 +108,13 @@ lazy val testKit = project.in(file("testkit")) ) .enablePlugins(BuildInfoPlugin) .dependsOn( - rest % "compile->compile;test->test;it->it" + restPersistence % "compile->compile;test->test;it->it" ) .dependsOn( - jest % "compile->compile;test->test;it->it" + jestPersistence % "compile->compile;test->test;it->it" ) lazy val root = project.in(file(".")) .configs(IntegrationTest) .settings(Defaults.itSettings, Publish.noPublishSettings) - .aggregate(sql, client, persistence, rest, jest, testKit) + .aggregate(sql, client, rest, jest, persistence, restPersistence, jestPersistence, testKit) diff --git a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index 0815fe6c..0ff260be 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -9,14 +9,17 @@ import akka.stream.scaladsl._ import app.softnetwork.persistence.model.Timestamped import app.softnetwork.serialization._ import app.softnetwork.elastic.sql.SQLQuery +import com.sksamuel.exts.Logging import com.typesafe.config.{Config, ConfigFactory} import org.json4s.{DefaultFormats, Formats} import org.json4s.jackson.JsonMethods._ +import java.util.UUID import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} /** Created by smanciot on 28/06/2018. */ @@ -42,6 +45,10 @@ trait ElasticClientApi } trait IndicesApi { + + /** Default settings for indices. This is used when creating an index without providing specific + * settings. It includes ngram tokenizer and analyzer, as well as some default limits. + */ val defaultSettings: String = """ |{ @@ -86,22 +93,95 @@ trait IndicesApi { |} """.stripMargin + /** Create an index with the provided name and settings. + * @param index + * - the name of the index to create + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the index was created successfully, false otherwise + */ def createIndex(index: String, settings: String = defaultSettings): Boolean + /** Delete an index with the provided name. + * @param index + * - the name of the index to delete + * @return + * true if the index was deleted successfully, false otherwise + */ def deleteIndex(index: String): Boolean + /** Close an index with the provided name. + * @param index + * - the name of the index to close + * @return + * true if the index was closed successfully, false otherwise + */ def closeIndex(index: String): Boolean + /** Open an index with the provided name. + * @param index + * - the name of the index to open + * @return + * true if the index was opened successfully, false otherwise + */ def openIndex(index: String): Boolean + + /** Reindex from source index to target index. + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean = true): Boolean + + /** Check if an index exists. + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + def indexExists(index: String): Boolean } trait AliasApi { + + /** Add an alias to the given index. + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was added successfully, false otherwise + */ def addAlias(index: String, alias: String): Boolean + + /** Remove an alias from the given index. + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was removed successfully, false otherwise + */ def removeAlias(index: String, alias: String): Boolean } trait SettingsApi { _: IndicesApi => - def toggleRefresh(index: String, enable: Boolean): Unit = { + + /** Toggle the refresh interval of an index. + * @param index + * - the name of the index + * @param enable + * - true to enable the refresh interval, false to disable it + * @return + * true if the settings were updated successfully, false otherwise + */ + def toggleRefresh(index: String, enable: Boolean): Boolean = { updateSettings( index, if (!enable) """{"index" : {"refresh_interval" : -1} }""" @@ -109,29 +189,272 @@ trait SettingsApi { _: IndicesApi => ) } - def setReplicas(index: String, replicas: Int): Unit = { + /** Set the number of replicas for an index. + * @param index + * - the name of the index + * @param replicas + * - the number of replicas to set + * @return + * true if the settings were updated successfully, false otherwise + */ + def setReplicas(index: String, replicas: Int): Boolean = { updateSettings(index, s"""{"index" : {"number_of_replicas" : $replicas} }""") } + /** Update the settings of an index. + * @param index + * - the name of the index + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the settings were updated successfully, false otherwise + */ def updateSettings(index: String, settings: String = defaultSettings): Boolean - def loadSettings(): String + /** Load the settings of an index. + * @param index + * - the name of the index to load the settings for + * @return + * the settings of the index as a JSON string + */ + def loadSettings(index: String): String } -trait MappingApi { - def setMapping(index: String, indexType: String, mapping: String): Boolean - def getMapping(index: String, indexType: String): String +trait MappingApi extends IndicesApi with RefreshApi with Logging { + + /** Set the mapping of an index. + * @param index + * - the name of the index to set the mapping for + * @param mapping + * - the mapping to set on the index + * @return + * true if the mapping was set successfully, false otherwise + */ + def setMapping(index: String, mapping: String): Boolean + + /** Get the mapping of an index. + * @param index + * - the name of the index to get the mapping for + * @return + * the mapping of the index as a JSON string + */ + def getMapping(index: String): String + + /** Get the mapping properties of an index. + * @param index + * - the name of the index to get the mapping properties for + * @return + * the mapping properties of the index as a JSON string + */ + def getMappingProperties(index: String): String = { + tryOrElse(getMapping(index), "{\"properties\": {}}")(logger) + } + + /** Check if the mapping of an index is different from the provided mapping. + * @param index + * - the name of the index to check + * @param mapping + * - the mapping to compare with the current mapping of the index + * @return + * true if the mapping is different, false otherwise + */ + def shouldUpdateMapping( + index: String, + mapping: String + ): Boolean = { + MappingComparator.isMappingDifferent(this.getMappingProperties(index), mapping) + } + + /** Update the mapping of an index to a new mapping. + * @param index + * - the name of the index to migrate + * @param mapping + * - the new mapping to set on the index + * @param settings + * - the settings to apply to the index (default is defaultSettings) + * @return + * true if the mapping was updated successfully, false otherwise + */ + def updateMapping( + index: String, + mapping: String, + settings: String = defaultSettings + ): Boolean = { + // Check if the index exists + if (!tryOrElse(this.indexExists(index), false)(logger)) { + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to create index: $index") + return false + } + logger.info(s"Index $index created successfully.") + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for index: $index") + return false + } + logger.info(s"Mapping for index $index set successfully.") + true + } + // Check if the mapping needs to be updated + else if (shouldUpdateMapping(index, mapping)) { + val tempIndex = index + "_tmp_" + UUID.randomUUID() + var tempCreated = false + var originalDeleted = false + logger.info("--- Starting dynamic mapping migration ---") + logger.info("Target index: " + index) + logger.info("Temporary index: " + tempIndex) + def migrate(): Boolean = { + // Create a temporary index with the new mapping + tempCreated = tryOrElse(this.createIndex(tempIndex, settings), false)(logger) + if (tempCreated) { + logger.info(s"Temporary index $tempIndex created successfully.") + // Set the new mapping on the temporary index + if (!tryOrElse(this.setMapping(tempIndex, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for temporary index: $tempIndex") + return false + } + logger.info(s"Mapping for temporary index $tempIndex set successfully.") + // Reindex from the original index to the temporary index + if (!tryOrElse(this.reindex(index, tempIndex), false)(logger)) { + logger.error( + s"Failed to reindex from original index: $index to temporary index: $tempIndex" + ) + return false + } + logger.info( + s"Reindexing from original index $index to temporary index $tempIndex completed successfully." + ) + // Delete the original index + originalDeleted = this.deleteIndex(index) + if (originalDeleted) { + logger.info(s"Original index $index deleted successfully.") + // Rename the temporary index to the original index name + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to recreate original index: $index") + return false + } + logger.info(s"Original index $index recreated successfully.") + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for original index: $index") + return false + } + logger.info(s"Mapping for original index $index set successfully.") + if (!tryOrElse(this.reindex(tempIndex, index), false)(logger)) { + logger.error( + s"Failed to reindex from temporary index: $tempIndex to original index: $index" + ) + return false + } + logger.info( + s"Reindexing from temporary index $tempIndex to original index $index completed successfully." + ) + if (!tryOrElse(this.openIndex(index), false)(logger)) { + logger.error(s"Failed to open original index: $index") + return false + } + logger.info(s"Original index $index opened successfully.") + logger.info("Dynamic mapping migration completed successfully.") + true + } else { + logger.error(s"Failed to delete original index: $index") + false + } + } else { + logger.error(s"Failed to create temporary index: $tempIndex") + false + } + } + val migration = Try(migrate()) match { + case Success(result) => result + case Failure(exception) => + logger.error("Exception during dynamic mapping migration", exception) + false + } + if (!migration) { + logger.error("Error during dynamic mapping migration") + if (originalDeleted) { + // If the original index was deleted, we need to recreate it + if (!tryOrElse(this.createIndex(index, settings), false)(logger)) { + logger.error(s"Failed to recreate original index: $index") + } else { + logger.info(s"Original index $index recreated successfully.") + // Set the original mapping back + if (!tryOrElse(this.setMapping(index, mapping), false)(logger)) { + logger.error(s"Failed to set mapping for original index: $index") + } else { + logger.info(s"Mapping for original index $index set successfully.") + if (!tryOrElse(this.reindex(tempIndex, index), false)(logger)) { + logger.error( + s"Failed to reindex from temporary index $tempIndex to original index $index" + ) + } else { + logger.info( + s"Reindexing from temporary index $tempIndex to original index $index completed successfully." + ) + if (!tryOrElse(this.refresh(index), false)(logger)) { + logger.error(s"Failed to refresh original index: $index") + } else { + logger.info(s"Original index $index refreshed successfully.") + } + } + } + } + } + } + if (tempCreated) { + // Clean up the temporary index if it was created + if (!tryOrElse(this.deleteIndex(tempIndex), false)(logger)) { + logger.error(s"Failed to delete temporary index: $tempIndex") + } else { + logger.info(s"Temporary index $tempIndex deleted successfully.") + } + } else { + logger.error(s"Temporary index $tempIndex was not created, skipping deletion.") + } + migration + } else { + false + } + } } trait RefreshApi { + + /** Refresh the index to make sure all documents are indexed and searchable. + * @param index + * - the name of the index to refresh + * @return + * true if the index was refreshed successfully, false otherwise + */ def refresh(index: String): Boolean } trait FlushApi { + + /** Flush the index to make sure all operations are written to disk. + * @param index + * - the name of the index to flush + * @param force + * - true to force the flush, false otherwise + * @param wait + * - true to wait for the flush to complete, false otherwise + * @return + * true if the index was flushed successfully, false otherwise + */ def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean } trait IndexApi { _: RefreshApi => + + /** Index an entity in the given index. + * @param entity + * - the entity to index + * @param index + * - the name of the index to index the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * true if the entity was indexed successfully, false otherwise + */ def index[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -146,8 +469,30 @@ trait IndexApi { _: RefreshApi => ) } + /** Index an entity in the given index. + * @param index + * - the name of the index to index the entity in + * @param indexType + * - the type of the index + * @param id + * - the id of the entity to index + * @param source + * - the source of the entity to index in JSON format + * @return + * true if the entity was indexed successfully, false otherwise + */ def index(index: String, indexType: String, id: String, source: String): Boolean + /** Index an entity in the given index asynchronously. + * @param entity + * - the entity to index + * @param index + * - the name of the index to index the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with true if the entity was indexed successfully, false otherwise + */ def indexAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -157,6 +502,18 @@ trait IndexApi { _: RefreshApi => indexAsync(index.getOrElse(indexType), indexType, entity.uuid, serialization.write[U](entity)) } + /** Index an entity in the given index asynchronously. + * @param index + * - the name of the index to index the entity in + * @param indexType + * - the type of the index + * @param id + * - the id of the entity to index + * @param source + * - the source of the entity to index in JSON format + * @return + * a Future that completes with true if the entity was indexed successfully, false otherwise + */ def indexAsync(index: String, indexType: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = { @@ -167,6 +524,19 @@ trait IndexApi { _: RefreshApi => } trait UpdateApi { _: RefreshApi => + + /** Update an entity in the given index. + * @param entity + * - the entity to update + * @param index + * - the name of the index to update the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * true if the entity was updated successfully, false otherwise + */ def update[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -183,14 +553,34 @@ trait UpdateApi { _: RefreshApi => ) } - def update( - index: String, - indexType: String, - id: String, - source: String, - upsert: Boolean - ): Boolean - + /** Update an entity in the given index. + * @param index + * - the name of the index to update the entity in + * @param indexType + * - the type of the index + * @param id + * - the id of the entity to update + * @param source + * - the source of the entity to update in JSON format + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * true if the entity was updated successfully, false otherwise + */ + def update(index: String, indexType: String, id: String, source: String, upsert: Boolean): Boolean + + /** Update an entity in the given index asynchronously. + * @param entity + * - the entity to update + * @param index + * - the name of the index to update the entity in (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * a Future that completes with true if the entity was updated successfully, false otherwise + */ def updateAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -208,6 +598,20 @@ trait UpdateApi { _: RefreshApi => ) } + /** Update an entity in the given index asynchronously. + * @param index + * - the name of the index to update the entity in + * @param indexType + * - the type of the index + * @param id + * - the id of the entity to update + * @param source + * - the source of the entity to update in JSON format + * @param upsert + * - true to upsert the entity if it does not exist, false otherwise + * @return + * a Future that completes with true if the entity was updated successfully, false otherwise + */ def updateAsync(index: String, indexType: String, id: String, source: String, upsert: Boolean)( implicit ec: ExecutionContext ): Future[Boolean] = { @@ -218,6 +622,17 @@ trait UpdateApi { _: RefreshApi => } trait DeleteApi { _: RefreshApi => + + /** Delete an entity from the given index. + * @param entity + * - the entity to delete + * @param index + * - the name of the index to delete the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * true if the entity was deleted successfully, false otherwise + */ def delete[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -227,8 +642,26 @@ trait DeleteApi { _: RefreshApi => delete(entity.uuid, index.getOrElse(indexType), indexType) } + /** Delete an entity from the given index. + * @param uuid + * - the id of the entity to delete + * @param index + * - the name of the index to delete the entity from + * @return + * true if the entity was deleted successfully, false otherwise + */ def delete(uuid: String, index: String, indexType: String): Boolean + /** Delete an entity from the given index asynchronously. + * @param entity + * - the entity to delete + * @param index + * - the name of the index to delete the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with true if the entity was deleted successfully, false otherwise + */ def deleteAsync[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -238,6 +671,16 @@ trait DeleteApi { _: RefreshApi => deleteAsync(entity.uuid, index.getOrElse(indexType), indexType) } + /** Delete an entity from the given index asynchronously. + * @param uuid + * - the id of the entity to delete + * @param index + * - the name of the index to delete the entity from + * @param indexType + * - the type of the index + * @return + * a Future that completes with true if the entity was deleted successfully, false otherwise + */ def deleteAsync(uuid: String, index: String, indexType: String)(implicit ec: ExecutionContext ): Future[Boolean] = { @@ -467,17 +910,37 @@ trait BulkApi { _: RefreshApi with SettingsApi => } trait CountApi { + + /** Count the number of documents matching the given JSON query asynchronously. + * @param query + * - the query to count the documents for + * @return + * the number of documents matching the query, or None if the count could not be determined + */ def countAsync(query: JSONQuery)(implicit ec: ExecutionContext): Future[Option[Double]] = { - Future( + Future { this.count(query) - ) + } } + /** Count the number of documents matching the given JSON query. + * @param query + * - the query to count the documents for + * @return + * the number of documents matching the query, or None if the count could not be determined + */ def count(query: JSONQuery): Option[Double] } trait AggregateApi[T <: AggregateResult] { + + /** Aggregate the results of the given SQL query. + * @param sqlQuery + * - the query to aggregate the results for + * @return + * a sequence of aggregated results + */ def aggregate(sqlQuery: SQLQuery)(implicit ec: ExecutionContext ): Future[_root_.scala.collection.Seq[T]] @@ -486,12 +949,33 @@ trait AggregateApi[T <: AggregateResult] { trait SingleValueAggregateApi extends AggregateApi[SingleValueAggregateResult] trait GetApi { + + /** Get an entity by its id from the given index. + * @param id + * - the id of the entity to get + * @param index + * - the name of the index to get the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * an Option containing the entity if it was found, None otherwise + */ def get[U <: Timestamped]( id: String, index: Option[String] = None, maybeType: Option[String] = None )(implicit m: Manifest[U], formats: Formats): Option[U] + /** Get an entity by its id from the given index asynchronously. + * @param id + * - the id of the entity to get + * @param index + * - the name of the index to get the entity from (default is the entity type name) + * @param maybeType + * - the type of the entity (default is the entity class name in lowercase) + * @return + * a Future that completes with an Option containing the entity if it was found, None otherwise + */ def getAsync[U <: Timestamped]( id: String, index: Option[String] = None, @@ -503,8 +987,28 @@ trait GetApi { trait SearchApi { + /** Search for entities matching the given JSON query. + * @param jsonQuery + * - the query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of entities matching the query + */ def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U] + /** Search for entities matching the given SQL query. + * @param sqlQuery + * - the SQL query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of entities matching the query + */ def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { sqlQuery.search match { case Some(searchRequest) => @@ -517,12 +1021,36 @@ trait SearchApi { } } + /** Search for entities matching the given SQL query asynchronously. + * @param sqlQuery + * - the SQL query to search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a Future that completes with a list of entities matching the query + */ def searchAsync[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = Future( this.search[U](sqlQuery) ) + /** Search for entities matching the given JSON query with inner hits. + * @param sqlQuery + * - the SQL query to search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of tuples containing the main entity and a list of inner hits + */ def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], @@ -540,12 +1068,36 @@ trait SearchApi { } } + /** Search for entities matching the given JSON query with inner hits. + * @param sqlQuery + * - the SQL query to search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of tuples containing the main entity and a list of inner hits + */ def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], formats: Formats ): List[(U, List[I])] + /** Perform a multi-search operation with the given SQL query. + * @param sqlQuery + * - the SQL query to perform the multi-search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of entities matching the queries in the multi-search request + */ def multiSearch[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { @@ -566,10 +1118,34 @@ trait SearchApi { } } + /** Perform a multi-search operation with the given JSON queries. + * @param jsonQueries + * - the JSON queries to perform the multi-search for + * @param m + * - the manifest of the type to search for + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of entities matching the queries in the multi-search request + */ def multiSearch[U]( jsonQueries: JSONQueries )(implicit m: Manifest[U], formats: Formats): List[List[U]] + /** Perform a multi-search operation with the given SQL query and inner hits. + * @param sqlQuery + * - the SQL query to perform the multi-search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of tuples containing the main entity and a list of inner hits + */ def multiSearchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], @@ -592,6 +1168,20 @@ trait SearchApi { } } + /** Perform a multi-search operation with the given JSON queries and inner hits. + * @param jsonQueries + * - the JSON queries to perform the multi-search for + * @param innerField + * - the field to use for inner hits + * @param m1 + * - the manifest of the type to search for + * @param m2 + * - the manifest of the inner hit type + * @param formats + * - the formats to use for serialization/deserialization + * @return + * a list of lists of tuples containing the main entity and a list of inner hits + */ def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], diff --git a/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala b/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala new file mode 100644 index 00000000..47a79bc6 --- /dev/null +++ b/client/src/main/scala/app/softnetwork/elastic/client/MappingComparator.scala @@ -0,0 +1,87 @@ +package app.softnetwork.elastic.client + +import com.google.gson._ +import com.typesafe.scalalogging.StrictLogging + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +object MappingComparator extends StrictLogging { + + private def parseJsonToMap(jsonString: String): Map[String, JsonElement] = { + Try( + new JsonParser() + .parse(jsonString) + .getAsJsonObject + .get("properties") + .getAsJsonObject + .entrySet() + .asScala + .map { entry => + // Convert the mapping properties to a Map[String, JsonElement] + entry.getKey -> entry.getValue + } + .toMap + ) match { + case Success(jsonMap) => jsonMap + case Failure(f) => + logger.error(s"Failed to parse JSON mapping $jsonString: ${f.getMessage}", f) + Map.empty[String, JsonElement] + } + } + + private def compareJsonMappings( + oldMap: Map[String, JsonElement], + newMap: Map[String, JsonElement] + ): Map[String, (Option[JsonElement], Option[JsonElement])] = { + val allKeys = oldMap.keySet union newMap.keySet + allKeys.flatMap { key => + (oldMap.get(key), newMap.get(key)) match { + case (Some(oldVal), Some(newVal)) if oldVal == newVal => + None // equal + case (a, b) => + Some(key -> (a, b)) // mismatch or missing + } + }.toMap + } + + private def hasDifferences( + diff: Map[String, (Option[JsonElement], Option[JsonElement])] + ): Boolean = + diff.nonEmpty + + private def formatDiff(diff: Map[String, (Option[JsonElement], Option[JsonElement])]): String = { + diff + .map { + case (key, (Some(oldV), Some(newV))) => + s"Field [$key] changed:\n old: ${oldV}\n new: ${newV}" + case (key, (Some(oldV), None)) => + s"Field [$key] removed:\n old: ${oldV}" + case (key, (None, Some(newV))) => + s"Field [$key] added:\n new: ${newV}" + case _ => "" + } + .mkString("\n") + } + + /** Compares two JSON mappings and logs the differences. + * + * @param oldMapping + * - the old mapping as a JSON string + * @param newMapping + * - the new mapping as a JSON string + * @return + * true if there are differences, false otherwise + */ + def isMappingDifferent(oldMapping: String, newMapping: String): Boolean = { + val oldMap = parseJsonToMap(oldMapping) + val newMap = parseJsonToMap(newMapping) + val diff = compareJsonMappings(oldMap, newMap) + if (hasDifferences(diff)) { + logger.info(formatDiff(diff)) + true + } else { + false + } + } +} diff --git a/client/src/main/scala/app/softnetwork/elastic/client/package.scala b/client/src/main/scala/app/softnetwork/elastic/client/package.scala index 2085f99a..15818d6e 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/package.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/package.scala @@ -5,10 +5,11 @@ import akka.stream.stage.{GraphStage, GraphStageLogic} import app.softnetwork.elastic.client.BulkAction.BulkAction import app.softnetwork.serialization._ import com.google.gson.{Gson, JsonElement, JsonObject} +import com.sksamuel.exts.Logging import com.typesafe.config.{Config, ConfigFactory} -import com.typesafe.scalalogging.StrictLogging import configs.Configs import org.json4s.Formats +import org.slf4j.Logger import scala.collection.immutable.Seq import scala.collection.mutable @@ -31,7 +32,7 @@ package object client { discoveryEnabled: Boolean = false ) - object ElasticConfig extends StrictLogging { + object ElasticConfig extends Logging { def apply(config: Config): ElasticConfig = { Configs[ElasticConfig] .get(config.withFallback(ConfigFactory.load("softnetwork-elastic.conf")), "elastic") @@ -166,4 +167,14 @@ package object client { case class JSONQuery(query: String, indices: Seq[String], types: Seq[String] = Seq.empty) case class JSONQueries(queries: List[JSONQuery]) + + def tryOrElse[T](block: => T, default: => T)(implicit logger: Logger): T = { + try { + block + } catch { + case e: Exception => + logger.error("An error occurred while executing the block", e) + default + } + } } diff --git a/jest/persistence/build.sbt b/jest/persistence/build.sbt new file mode 100644 index 00000000..794caf4a --- /dev/null +++ b/jest/persistence/build.sbt @@ -0,0 +1,3 @@ +organization := "app.softnetwork.elastic" + +name := "elastic-jest-persistence" diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala b/jest/persistence/src/main/scala/app/softnetwork/persistence/query/JestProvider.scala similarity index 76% rename from jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala rename to jest/persistence/src/main/scala/app/softnetwork/persistence/query/JestProvider.scala index 19e12c0a..926b13c5 100644 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala +++ b/jest/persistence/src/main/scala/app/softnetwork/persistence/query/JestProvider.scala @@ -1,5 +1,6 @@ -package app.softnetwork.elastic.client.jest +package app.softnetwork.persistence.query +import app.softnetwork.elastic.client.jest.JestClientApi import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.model.Timestamped diff --git a/jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala b/jest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala similarity index 63% rename from jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala rename to jest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala index 21a628c2..f7c862a5 100644 --- a/jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala +++ b/jest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala @@ -1,9 +1,8 @@ -package app.softnetwork.elastic.persistence.query +package app.softnetwork.persistence.query -import app.softnetwork.elastic.client.jest.JestProvider +import app.softnetwork.elastic.persistence.query.State2ElasticProcessorStream import app.softnetwork.persistence.message.CrudEvent import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} trait State2ElasticProcessorStreamWithJestProvider[T <: Timestamped, E <: CrudEvent] extends State2ElasticProcessorStream[T, E] diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala index e367254d..10ddcc98 100644 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala +++ b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala @@ -8,12 +8,14 @@ import app.softnetwork.elastic.sql import app.softnetwork.elastic.sql.{ElasticSearchRequest, SQLQuery} import app.softnetwork.persistence.model.Timestamped import app.softnetwork.serialization._ +import com.google.gson.{Gson, JsonParser} import io.searchbox.action.BulkableAction import io.searchbox.core._ import io.searchbox.core.search.aggregation.RootAggregation import io.searchbox.indices._ import io.searchbox.indices.aliases.{AddAliasMapping, ModifyAliases, RemoveAliasMapping} import io.searchbox.indices.mapping.{GetMapping, PutMapping} +import io.searchbox.indices.reindex.Reindex import io.searchbox.indices.settings.{GetSettings, UpdateSettings} import io.searchbox.params.Parameters import org.json4s.Formats @@ -42,39 +44,121 @@ trait JestClientApi with JestSearchApi with JestBulkApi -trait JestIndicesApi extends IndicesApi with JestClientCompanion { +trait JestIndicesApi extends IndicesApi with JestRefreshApi with JestClientCompanion { override def createIndex(index: String, settings: String = defaultSettings): Boolean = - apply().execute(new CreateIndex.Builder(index).settings(settings).build()).isSucceeded + tryOrElse( + apply() + .execute( + new CreateIndex.Builder(index).settings(settings).build() + ) + .isSucceeded, + false + )(logger) override def deleteIndex(index: String): Boolean = - apply().execute(new DeleteIndex.Builder(index).build()).isSucceeded + tryOrElse( + apply() + .execute( + new DeleteIndex.Builder(index).build() + ) + .isSucceeded, + false + )(logger) override def closeIndex(index: String): Boolean = - apply().execute(new CloseIndex.Builder(index).build()).isSucceeded + tryOrElse( + apply() + .execute( + new CloseIndex.Builder(index).build() + ) + .isSucceeded, + false + )(logger) override def openIndex(index: String): Boolean = - apply().execute(new OpenIndex.Builder(index).build()).isSucceeded + tryOrElse( + apply() + .execute( + new OpenIndex.Builder(index).build() + ) + .isSucceeded, + false + )(logger) + + /** Reindex from source index to target index. + * + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + override def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean): Boolean = { + tryOrElse( + { + apply() + .execute( + new Reindex.Builder(s"""{"index": "$sourceIndex"}""", s"""{"index": "$targetIndex"}""") + .build() + ) + .isSucceeded && { + if (refresh) { + this.refresh(targetIndex) + } else { + true + } + } + }, + false + )(logger) + } + + /** Check if an index exists. + * + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + override def indexExists(index: String): Boolean = + tryOrElse( + apply() + .execute( + new IndicesExists.Builder(index).build() + ) + .isSucceeded, + false + )(logger) } trait JestAliasApi extends AliasApi with JestClientCompanion { override def addAlias(index: String, alias: String): Boolean = { - apply() - .execute( - new ModifyAliases.Builder( - new AddAliasMapping.Builder(index, alias).build() - ).build() - ) - .isSucceeded + tryOrElse( + apply() + .execute( + new ModifyAliases.Builder( + new AddAliasMapping.Builder(index, alias).build() + ).build() + ) + .isSucceeded, + false + )(logger) } override def removeAlias(index: String, alias: String): Boolean = { - apply() - .execute( - new ModifyAliases.Builder( - new RemoveAliasMapping.Builder(index, alias).build() - ).build() - ) - .isSucceeded + tryOrElse( + apply() + .execute( + new ModifyAliases.Builder( + new RemoveAliasMapping.Builder(index, alias).build() + ).build() + ) + .isSucceeded, + false + )(logger) } } @@ -82,35 +166,96 @@ trait JestSettingsApi extends SettingsApi with JestClientCompanion { _: IndicesApi => override def updateSettings(index: String, settings: String = defaultSettings): Boolean = closeIndex(index) && - apply().execute(new UpdateSettings.Builder(settings).addIndex(index).build()).isSucceeded && + tryOrElse( + apply() + .execute( + new UpdateSettings.Builder(settings).addIndex(index).build() + ) + .isSucceeded, + false + )(logger) && openIndex(index) - override def loadSettings(): String = - apply().execute(new GetSettings.Builder().build()).getJsonString + override def loadSettings(index: String): String = + tryOrElse( + apply() + .execute( + new GetSettings.Builder().addIndex(index).build() + ) + .getJsonString, + s"""{"$index": {"settings": {"index": {}}}}""" + )(logger) } trait JestMappingApi extends MappingApi with JestClientCompanion { _: IndicesApi => - override def setMapping(index: String, indexType: String, mapping: String): Boolean = - apply().execute(new PutMapping.Builder(index, indexType, mapping).build()).isSucceeded - - override def getMapping(index: String, indexType: String): String = - apply() - .execute(new GetMapping.Builder().addIndex(index).addType(indexType).build()) - .getJsonString + override def setMapping(index: String, mapping: String): Boolean = + tryOrElse( + apply() + .execute( + new PutMapping.Builder(index, "_doc", mapping).build() + ) + .isSucceeded, + false + )(logger) + + override def getMapping(index: String): String = + tryOrElse( + apply() + .execute( + new GetMapping.Builder().addIndex(index).addType("_doc").build() + ) + .getJsonString, + s""""{$index: {"mappings": {"_doc":{"properties": {}}}}}""" // empty mapping + )(logger) + + /** Get the mapping properties of an index. + * + * @param index + * - the name of the index to get the mapping properties for + * @return + * the mapping properties of the index as a JSON string + */ + override def getMappingProperties(index: String): String = { + tryOrElse( + new Gson().toJson( + new JsonParser() + .parse(getMapping(index)) + .getAsJsonObject + .get(index) + .getAsJsonObject + .get("mappings") + .getAsJsonObject + .get("_doc") + .getAsJsonObject + ), + "{\"properties\": {}}" + )(logger) + } } trait JestRefreshApi extends RefreshApi with JestClientCompanion { override def refresh(index: String): Boolean = - apply().execute(new Refresh.Builder().addIndex(index).build()).isSucceeded + tryOrElse( + apply() + .execute( + new Refresh.Builder().addIndex(index).build() + ) + .isSucceeded, + false + )(logger) } trait JestFlushApi extends FlushApi with JestClientCompanion { - override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = apply() - .execute( - new Flush.Builder().addIndex(index).force(force).waitIfOngoing(wait).build() - ) - .isSucceeded + override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = + tryOrElse( + apply() + .execute( + new Flush.Builder().addIndex(index).force(force).waitIfOngoing(wait).build() + ) + .isSucceeded, + false + )(logger) } trait JestCountApi extends CountApi with JestClientCompanion { @@ -140,10 +285,17 @@ trait JestCountApi extends CountApi with JestClientCompanion { val count = new Count.Builder().query(query) for (indice <- indices) count.addIndex(indice) for (t <- types) count.addType(t) - val result = apply().execute(count.build()) - if (!result.isSucceeded) - logger.error(result.getErrorMessage) - Option(result.getCount) + Try { + apply().execute(count.build()) + } match { + case Success(result) => + if (!result.isSucceeded) + logger.error(result.getErrorMessage) + Option(result.getCount) + case Failure(f) => + logger.error(f.getMessage, f) + None + } } } @@ -352,13 +504,20 @@ trait JestUpdateApi extends UpdateApi with JestClientCompanion { trait JestDeleteApi extends DeleteApi with JestClientCompanion { _: RefreshApi => override def delete(uuid: String, index: String, indexType: String): Boolean = { - val result = apply().execute( - new Delete.Builder(uuid).index(index).`type`(indexType).build() - ) - if (!result.isSucceeded) { - logger.error(result.getErrorMessage) + Try( + apply() + .execute( + new Delete.Builder(uuid).index(index).`type`(indexType).build() + ) + ) match { + case Success(result) => + if (!result.isSucceeded) + logger.error(result.getErrorMessage) + result.isSucceeded && this.refresh(index) + case Failure(f) => + logger.error(f.getMessage, f) + false } - result.isSucceeded && this.refresh(index) } override def deleteAsync(uuid: String, index: String, _type: String)(implicit @@ -495,15 +654,13 @@ trait JestSearchApi extends SearchApi with JestClientCompanion { m2: Manifest[I], formats: Formats ): List[(U, List[I])] = { - val result = apply().execute(jsonQuery.search) - (if (result.isSucceeded) { - Some(result) - } else { - logger.error(result.getErrorMessage) - None - }) match { - case Some(searchResult) => - Try(searchResult.getJsonObject ~> [U, I] innerField) match { + Try(apply().execute(jsonQuery.search)).toOption match { + case Some(result) => + if (!result.isSucceeded) { + logger.error(result.getErrorMessage) + return List.empty + } + Try(result.getJsonObject ~> [U, I] innerField) match { case Success(s) => s case Failure(f) => logger.error(f.getMessage, f) @@ -516,15 +673,20 @@ trait JestSearchApi extends SearchApi with JestClientCompanion { override def multiSearch[U]( jsonQueries: JSONQueries )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - val searches: List[Search] = jsonQueries.queries.map(_.search) - val multiSearchResult = apply().execute(new MultiSearch.Builder(searches.asJava).build()) - multiSearchResult.getResponses.asScala - .map(searchResponse => - searchResponse.searchResult.getSourceAsStringList.asScala - .map(source => serialization.read[U](source)) + tryOrElse( + { + val multiSearchResult = + apply().execute(new MultiSearch.Builder(jsonQueries.queries.map(_.search).asJava).build()) + multiSearchResult.getResponses.asScala + .map(searchResponse => + searchResponse.searchResult.getSourceAsStringList.asScala + .map(source => serialization.read[U](source)) + .toList + ) .toList - ) - .toList + }, + List.empty + )(logger) } override def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit @@ -533,14 +695,23 @@ trait JestSearchApi extends SearchApi with JestClientCompanion { formats: Formats ): List[List[(U, List[I])]] = { val multiSearch = new MultiSearch.Builder(jsonQueries.queries.map(_.search).asJava).build() - val multiSearchResult = apply().execute(multiSearch) - if (multiSearchResult.isSucceeded) { - multiSearchResult.getResponses.asScala - .map(searchResponse => searchResponse.searchResult.getJsonObject ~> [U, I] innerField) - .toList - } else { - logger.error(multiSearchResult.getErrorMessage) - List.empty + Try(apply().execute(multiSearch)).toOption match { + case Some(multiSearchResult) => + if (!multiSearchResult.isSucceeded) { + logger.error(multiSearchResult.getErrorMessage) + return List.empty + } + multiSearchResult.getResponses.asScala + .map(searchResponse => { + Try(searchResponse.searchResult.getJsonObject ~> [U, I] innerField) match { + case Success(s) => s + case Failure(f) => + logger.error(f.getMessage, f) + List.empty[(U, List[I])] + } + }) + .toList + case _ => List.empty } } diff --git a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala index 6bfadc74..2df9100d 100644 --- a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala +++ b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala @@ -44,9 +44,9 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w Try { createIndex(index) addAlias(index, alias) - setMapping(index, _type, loadMapping(mappingPath)) + setMapping(index, loadMapping(mappingPath)) } match { - case Success(_) => logger.info(s"index:$index type:${_type} alias:$alias created") + case Success(_) => logger.info(s"index:$index alias:$alias created") case Failure(f) => logger.error(s"!!!!! index:$index type:${_type} alias:$alias -> ${f.getMessage}", f) } diff --git a/rest/persistence/build.sbt b/rest/persistence/build.sbt new file mode 100644 index 00000000..bafe9599 --- /dev/null +++ b/rest/persistence/build.sbt @@ -0,0 +1,3 @@ +organization := "app.softnetwork.elastic" + +name := "elastic-rest-persistence" diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala b/rest/persistence/src/main/scala/app/softnetwork/persistence/query/RestHighLevelClientProvider.scala similarity index 74% rename from rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala rename to rest/persistence/src/main/scala/app/softnetwork/persistence/query/RestHighLevelClientProvider.scala index 9a5b40f5..879c5879 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala +++ b/rest/persistence/src/main/scala/app/softnetwork/persistence/query/RestHighLevelClientProvider.scala @@ -1,5 +1,6 @@ -package app.softnetwork.elastic.client.rest +package app.softnetwork.persistence.query +import app.softnetwork.elastic.client.rest.RestHighLevelClientApi import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.model.Timestamped diff --git a/rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala b/rest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala similarity index 62% rename from rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala rename to rest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala index 0b7df127..c114c063 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala +++ b/rest/persistence/src/main/scala/app/softnetwork/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala @@ -1,9 +1,8 @@ -package app.softnetwork.elastic.persistence.query +package app.softnetwork.persistence.query -import app.softnetwork.elastic.client.rest.RestHighLevelClientProvider +import app.softnetwork.elastic.persistence.query.State2ElasticProcessorStream import app.softnetwork.persistence.message.CrudEvent import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} trait State2ElasticProcessorStreamWithRestProvider[T <: Timestamped, E <: CrudEvent] extends State2ElasticProcessorStream[T, E] diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala index 38899e44..2a43ffc5 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala @@ -25,9 +25,14 @@ import org.elasticsearch.action.index.{IndexRequest, IndexResponse} import org.elasticsearch.action.search.{MultiSearchRequest, SearchRequest, SearchResponse} import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse} import org.elasticsearch.action.{ActionListener, DocWriteRequest} -import org.elasticsearch.client.RequestOptions +import org.elasticsearch.client.{Request, RequestOptions} import org.elasticsearch.client.core.{CountRequest, CountResponse} -import org.elasticsearch.client.indices.{CreateIndexRequest, GetMappingsRequest, PutMappingRequest} +import org.elasticsearch.client.indices.{ + CreateIndexRequest, + GetIndexRequest, + GetMappingsRequest, + PutMappingRequest +} import org.elasticsearch.common.io.stream.InputStreamStreamInput import org.elasticsearch.common.xcontent.{DeprecationHandler, XContentType} import org.elasticsearch.rest.RestStatus @@ -43,6 +48,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder import org.json4s.Formats import java.io.ByteArrayInputStream +import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.implicitConversions import scala.util.{Failure, Success, Try} @@ -66,59 +72,125 @@ trait RestHighLevelClientApi trait RestHighLevelClientIndicesApi extends IndicesApi with RestHighLevelClientCompanion { override def createIndex(index: String, settings: String): Boolean = { - apply() - .indices() - .create( - new CreateIndexRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged + tryOrElse( + apply() + .indices() + .create( + new CreateIndexRequest(index) + .settings(settings, XContentType.JSON), + RequestOptions.DEFAULT + ) + .isAcknowledged, + false + )(logger) } override def deleteIndex(index: String): Boolean = { - apply().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged + tryOrElse( + apply() + .indices() + .delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT) + .isAcknowledged, + false + )(logger) } override def openIndex(index: String): Boolean = { - apply().indices().open(new OpenIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged + tryOrElse( + apply().indices().open(new OpenIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged, + false + )(logger) } override def closeIndex(index: String): Boolean = { - apply().indices().close(new CloseIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged + tryOrElse( + apply().indices().close(new CloseIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged, + false + )(logger) + } + + /** Reindex from source index to target index. + * + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + override def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean): Boolean = { + val request = new Request("POST", s"/_reindex?refresh=$refresh") + request.setJsonEntity( + s""" + |{ + | "source": { + | "index": "$sourceIndex" + | }, + | "dest": { + | "index": "$targetIndex" + | } + |} + """.stripMargin + ) + tryOrElse( + apply().getLowLevelClient.performRequest(request).getStatusLine.getStatusCode < 400, + false + )(logger) + } + + /** Check if an index exists. + * + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + override def indexExists(index: String): Boolean = { + tryOrElse( + apply().indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT), + false + )(logger) } } trait RestHighLevelClientAliasApi extends AliasApi with RestHighLevelClientCompanion { override def addAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.ADD) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged + tryOrElse( + apply() + .indices() + .updateAliases( + new IndicesAliasesRequest() + .addAliasAction( + new AliasActions(AliasActions.Type.ADD) + .index(index) + .alias(alias) + ), + RequestOptions.DEFAULT + ) + .isAcknowledged, + false + )(logger) } override def removeAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.REMOVE) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged + tryOrElse( + apply() + .indices() + .updateAliases( + new IndicesAliasesRequest() + .addAliasAction( + new AliasActions(AliasActions.Type.REMOVE) + .index(index) + .alias(alias) + ), + RequestOptions.DEFAULT + ) + .isAcknowledged, + false + )(logger) } } @@ -126,72 +198,93 @@ trait RestHighLevelClientSettingsApi extends SettingsApi with RestHighLevelClien _: RestHighLevelClientIndicesApi => override def updateSettings(index: String, settings: String): Boolean = { - apply() - .indices() - .putSettings( - new UpdateSettingsRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged + tryOrElse( + apply() + .indices() + .putSettings( + new UpdateSettingsRequest(index) + .settings(settings, XContentType.JSON), + RequestOptions.DEFAULT + ) + .isAcknowledged, + false + )(logger) } - override def loadSettings(): String = { - apply() - .indices() - .getSettings( - new GetSettingsRequest().indices("*"), - RequestOptions.DEFAULT - ) - .toString + override def loadSettings(index: String): String = { + tryOrElse( + apply() + .indices() + .getSettings( + new GetSettingsRequest().indices(index), + RequestOptions.DEFAULT + ) + .toString, + s"""{"$index": {"settings": {"index": {}}}}""" + )(logger) } } trait RestHighLevelClientMappingApi extends MappingApi with RestHighLevelClientCompanion { - override def setMapping(index: String, indexType: String, mapping: String): Boolean = { - apply() - .indices() - .putMapping( - new PutMappingRequest(index) - .source(mapping, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged + override def setMapping(index: String, mapping: String): Boolean = { + tryOrElse( + apply() + .indices() + .putMapping( + new PutMappingRequest(index) + .source(mapping, XContentType.JSON), + RequestOptions.DEFAULT + ) + .isAcknowledged, + false + )(logger) } - override def getMapping(index: String, indexType: String): String = { - apply() - .indices() - .getMapping( - new GetMappingsRequest().indices(index), - RequestOptions.DEFAULT - ) - .toString + override def getMapping(index: String): String = { + tryOrElse( + apply() + .indices() + .getMapping( + new GetMappingsRequest().indices(index), + RequestOptions.DEFAULT + ) + .mappings() + .asScala + .get(index) + .map(metadata => metadata.source().string()), + None + )(logger).getOrElse(s""""{$index: {"mappings": {}}}""") } } trait RestHighLevelClientRefreshApi extends RefreshApi with RestHighLevelClientCompanion { override def refresh(index: String): Boolean = { - apply() - .indices() - .refresh( - new RefreshRequest(index), - RequestOptions.DEFAULT - ) - .getStatus - .getStatus < 400 + tryOrElse( + apply() + .indices() + .refresh( + new RefreshRequest(index), + RequestOptions.DEFAULT + ) + .getStatus + .getStatus < 400, + false + )(logger) } } trait RestHighLevelClientFlushApi extends FlushApi with RestHighLevelClientCompanion { override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = { - apply() - .indices() - .flush( - new FlushRequest(index).force(force).waitIfOngoing(wait), - RequestOptions.DEFAULT - ) - .getStatus == RestStatus.OK + tryOrElse( + apply() + .indices() + .flush( + new FlushRequest(index).force(force).waitIfOngoing(wait), + RequestOptions.DEFAULT + ) + .getStatus == RestStatus.OK, + false + )(logger) } } @@ -214,15 +307,18 @@ trait RestHighLevelClientCountApi extends CountApi with RestHighLevelClientCompa } override def count(query: client.JSONQuery): Option[Double] = { - Option( - apply() - .count( - new CountRequest().indices(query.indices: _*).types(query.types: _*), - RequestOptions.DEFAULT - ) - .getCount - .toDouble - ) + tryOrElse( + Option( + apply() + .count( + new CountRequest().indices(query.indices: _*).types(query.types: _*), + RequestOptions.DEFAULT + ) + .getCount + .toDouble + ), + None + )(logger) } } @@ -342,23 +438,30 @@ trait RestHighLevelClientSingleValueAggregateApi trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientCompanion { _: RestHighLevelClientRefreshApi => - override def index(index: String, _type: String, id: String, source: String): Boolean = { - apply() - .index( - new IndexRequest(index, _type, id) - .source(source, XContentType.JSON), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 + override def index(index: String, indexType: String, id: String, source: String): Boolean = { + tryOrElse( + apply() + .index( + new IndexRequest(index) + .`type`(indexType) + .id(id) + .source(source, XContentType.JSON), + RequestOptions.DEFAULT + ) + .status() + .getStatus < 400, + false + )(logger) } - override def indexAsync(index: String, _type: String, id: String, source: String)(implicit + override def indexAsync(index: String, indexType: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().indexAsync( - new IndexRequest(index, _type, id) + new IndexRequest(index) + .`type`(indexType) + .id(id) .source(source, XContentType.JSON), RequestOptions.DEFAULT, new ActionListener[IndexResponse] { @@ -376,32 +479,35 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCom _: RestHighLevelClientRefreshApi => override def update( index: String, - _type: String, + indexType: String, id: String, source: String, upsert: Boolean ): Boolean = { - apply() - .update( - new UpdateRequest(index, _type, id) - .doc(source, XContentType.JSON) - .docAsUpsert(upsert), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 + tryOrElse( + apply() + .update( + new UpdateRequest(index, indexType, id) + .doc(source, XContentType.JSON) + .docAsUpsert(upsert), + RequestOptions.DEFAULT + ) + .status() + .getStatus < 400, + false + )(logger) } override def updateAsync( index: String, - _type: String, + indexType: String, id: String, source: String, upsert: Boolean )(implicit ec: ExecutionContext): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().updateAsync( - new UpdateRequest(index, _type, id) + new UpdateRequest(index, indexType, id) .doc(source, XContentType.JSON) .docAsUpsert(upsert), RequestOptions.DEFAULT, @@ -419,22 +525,25 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCom trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientCompanion { _: RestHighLevelClientRefreshApi => - override def delete(uuid: String, index: String, _type: String): Boolean = { - apply() - .delete( - new DeleteRequest(index, _type, uuid), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 + override def delete(uuid: String, index: String, indexType: String): Boolean = { + tryOrElse( + apply() + .delete( + new DeleteRequest(index, indexType, uuid), + RequestOptions.DEFAULT + ) + .status() + .getStatus < 400, + false + )(logger) } - override def deleteAsync(uuid: String, index: String, _type: String)(implicit + override def deleteAsync(uuid: String, index: String, indexType: String)(implicit ec: ExecutionContext ): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().deleteAsync( - new DeleteRequest(index, _type, uuid), + new DeleteRequest(index, indexType, uuid), RequestOptions.DEFAULT, new ActionListener[DeleteResponse] { override def onResponse(response: DeleteResponse): Unit = diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala index fe288531..4dddb6a0 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala @@ -4,7 +4,6 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.Flow import app.softnetwork.elastic.sql.SQLQuery -import app.softnetwork.persistence.message.CountResponse import org.json4s.Formats import app.softnetwork.persistence.model.Timestamped import org.slf4j.{Logger, LoggerFactory} @@ -21,17 +20,28 @@ trait MockElasticClientApi extends ElasticClientApi { protected val elasticDocuments: ElasticDocuments = new ElasticDocuments() {} - override def toggleRefresh(index: String, enable: Boolean): Unit = {} + override def toggleRefresh(index: String, enable: Boolean): Boolean = true - override def setReplicas(index: String, replicas: Int): Unit = {} + override def setReplicas(index: String, replicas: Int): Boolean = true override def updateSettings(index: String, settings: String) = true override def addAlias(index: String, alias: String): Boolean = true + /** Remove an alias from the given index. + * + * @param index + * - the name of the index + * @param alias + * - the name of the alias + * @return + * true if the alias was removed successfully, false otherwise + */ + override def removeAlias(index: String, alias: String): Boolean = true + override def createIndex(index: String, settings: String): Boolean = true - override def setMapping(index: String, _type: String, mapping: String): Boolean = true + override def setMapping(index: String, mapping: String): Boolean = true override def deleteIndex(index: String): Boolean = true @@ -39,10 +49,28 @@ trait MockElasticClientApi extends ElasticClientApi { override def openIndex(index: String): Boolean = true - override def countAsync(jsonQuery: JSONQuery)(implicit - ec: ExecutionContext - ): Future[Option[Double]] = - throw new UnsupportedOperationException + /** Reindex from source index to target index. + * + * @param sourceIndex + * - the name of the source index + * @param targetIndex + * - the name of the target index + * @param refresh + * - true to refresh the target index after reindexing, false otherwise + * @return + * true if the reindexing was successful, false otherwise + */ + override def reindex(sourceIndex: String, targetIndex: String, refresh: Boolean = true): Boolean = + true + + /** Check if an index exists. + * + * @param index + * - the name of the index to check + * @return + * true if the index exists, false otherwise + */ + override def indexExists(index: String): Boolean = false override def count(jsonQuery: JSONQuery): Option[Double] = throw new UnsupportedOperationException @@ -54,55 +82,15 @@ trait MockElasticClientApi extends ElasticClientApi { )(implicit m: Manifest[U], formats: Formats): Option[U] = elasticDocuments.get(id).asInstanceOf[Option[U]] - override def getAsync[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = - Future.successful(elasticDocuments.get(id).asInstanceOf[Option[U]]) - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = elasticDocuments.getAll.toList.asInstanceOf[List[U]] - override def searchAsync[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = - Future.successful(search(sqlQuery)) - - override def multiSearch[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = - throw new UnsupportedOperationException - override def multiSearch[U]( jsonQueries: JSONQueries )(implicit m: Manifest[U], formats: Formats): List[List[U]] = throw new UnsupportedOperationException - override def index[U <: Timestamped]( - entity: U, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit u: ClassTag[U], formats: Formats): Boolean = { - elasticDocuments.createOrUpdate(entity) - true - } - - override def indexAsync[U <: Timestamped]( - entity: U, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - elasticDocuments.createOrUpdate(entity) - Future.successful(true) - } - - override def index(index: String, _type: String, id: String, source: String): Boolean = - throw new UnsupportedOperationException - - override def indexAsync(index: String, _type: String, id: String, source: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = + override def index(index: String, indexType: String, id: String, source: String): Boolean = throw new UnsupportedOperationException override def update[U <: Timestamped]( @@ -115,19 +103,9 @@ trait MockElasticClientApi extends ElasticClientApi { true } - override def updateAsync[U <: Timestamped]( - entity: U, - index: Option[String] = None, - maybeType: Option[String] = None, - upsert: Boolean = true - )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - elasticDocuments.createOrUpdate(entity) - Future.successful(true) - } - override def update( index: String, - _type: String, + indexType: String, id: String, source: String, upsert: Boolean @@ -136,15 +114,7 @@ trait MockElasticClientApi extends ElasticClientApi { false } - override def updateAsync( - index: String, - _type: String, - id: String, - source: String, - upsert: Boolean - )(implicit ec: ExecutionContext): Future[Boolean] = Future.successful(false) - - override def delete(uuid: String, index: String, _type: String): Boolean = { + override def delete(uuid: String, index: String, indexType: String): Boolean = { if (elasticDocuments.get(uuid).isDefined) { elasticDocuments.delete(uuid) true @@ -153,12 +123,6 @@ trait MockElasticClientApi extends ElasticClientApi { } } - override def deleteAsync(uuid: String, index: String, _type: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - Future.successful(delete(uuid, index, _type)) - } - override def refresh(index: String): Boolean = true override def flush(index: String, force: Boolean, wait: Boolean): Boolean = true @@ -191,12 +155,6 @@ trait MockElasticClientApi extends ElasticClientApi { formats: Formats ): List[List[(U, List[I])]] = List.empty - override def multiSearchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = List.empty - override def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U] = List.empty @@ -206,13 +164,7 @@ trait MockElasticClientApi extends ElasticClientApi { formats: Formats ): List[(U, List[I])] = List.empty - override def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = List.empty - - override def getMapping(index: String, _type: String): String = + override def getMapping(index: String): String = throw new UnsupportedOperationException override def aggregate(sqlQuery: SQLQuery)(implicit @@ -220,7 +172,8 @@ trait MockElasticClientApi extends ElasticClientApi { ): Future[Seq[SingleValueAggregateResult]] = throw new UnsupportedOperationException - override def loadSettings(): String = throw new UnsupportedOperationException + override def loadSettings(index: String): String = + throw new UnsupportedOperationException } trait ElasticDocuments { diff --git a/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala b/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala new file mode 100644 index 00000000..4ad9dc53 --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/persistence/person/ElasticPersonTestKit.scala @@ -0,0 +1,15 @@ +package app.softnetwork.persistence.person + +import app.softnetwork.elastic.scalatest.EmbeddedElasticTestKit +import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit + +trait ElasticPersonTestKit + extends PersonTestKit + with InMemoryPersistenceTestKit + with EmbeddedElasticTestKit { + + override def beforeAll(): Unit = { + super.beforeAll() + initAndJoinCluster() + } +} diff --git a/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala b/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala new file mode 100644 index 00000000..d3b6c3f9 --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/persistence/query/PersonToElasticProcessorStream.scala @@ -0,0 +1,14 @@ +package app.softnetwork.persistence.query + +import app.softnetwork.elastic.client.ElasticClientApi +import app.softnetwork.elastic.persistence.query.{ElasticProvider, State2ElasticProcessorStream} +import app.softnetwork.persistence.person.message.PersonEvent +import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.person.query.PersonToExternalProcessorStream + +trait PersonToElasticProcessorStream + extends State2ElasticProcessorStream[Person, PersonEvent] + with PersonToExternalProcessorStream + with InMemoryJournalProvider + with InMemoryOffsetProvider + with ElasticProvider[Person] { _: ElasticClientApi => } diff --git a/testkit/src/test/resources/mapping/person.mustache b/testkit/src/test/resources/mapping/person.mustache new file mode 100644 index 00000000..21829e1c --- /dev/null +++ b/testkit/src/test/resources/mapping/person.mustache @@ -0,0 +1,21 @@ +{ + "properties": { + "uuid": { + "type": "keyword", + "index": true + }, + "name": { + "type": "text", + "analyzer": "search_analyzer" + }, + "birthDate": { + "type": "keyword" + }, + "createdDate": { + "type": "date" + }, + "lastUpdated": { + "type": "date" + } + } +} \ No newline at end of file diff --git a/testkit/src/test/resources/mapping/sample.mustache b/testkit/src/test/resources/mapping/sample.mustache deleted file mode 100644 index f3e19d45..00000000 --- a/testkit/src/test/resources/mapping/sample.mustache +++ /dev/null @@ -1,16 +0,0 @@ -{ - "{{type}}": { - "properties": { - "uuid": { - "type": "keyword", - "index": true - }, - "createdDate": { - "type": "date" - }, - "lastUpdated": { - "type": "date" - } - } - } -} \ No newline at end of file diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index bf78f463..30d53761 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -1,8 +1,5 @@ package app.softnetwork.elastic.client -import java.io.ByteArrayInputStream -import java.util.concurrent.TimeUnit -import java.util.UUID import akka.actor.ActorSystem import app.softnetwork.elastic.sql.SQLQuery import com.fasterxml.jackson.core.JsonParseException @@ -15,19 +12,28 @@ import app.softnetwork.elastic.model._ import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.elastic.scalatest.EmbeddedElasticTestKit import app.softnetwork.persistence.person.model.Person +import com.google.gson.JsonParser +import com.sksamuel.exts.Logging import org.json4s.Formats import org.slf4j.{Logger, LoggerFactory} -import java.nio.file.{Files, Paths} +import _root_.java.io.ByteArrayInputStream +import _root_.java.nio.file.{Files, Paths} +import _root_.java.time.format.DateTimeFormatter +import _root_.java.util.concurrent.TimeUnit +import _root_.java.util.UUID import java.time.{LocalDate, LocalDateTime, ZoneOffset} -import java.time.format.DateTimeFormatter import scala.concurrent.{Await, ExecutionContextExecutor} import scala.concurrent.duration.Duration import scala.util.{Failure, Success} /** Created by smanciot on 28/06/2018. */ -trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with Matchers { +trait ElasticClientSpec + extends AnyFlatSpecLike + with EmbeddedElasticTestKit + with Matchers + with Logging { lazy val log: Logger = LoggerFactory getLogger getClass.getName @@ -56,6 +62,20 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with super.afterAll() } + val persons: List[String] = List( + """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "childrenCount": 0} """ + ) + + private val personsWithUpsert = + persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ + + val children: List[String] = List( + """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"} """, + """ { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09"} """ + ) + "Creating an index and then delete it" should "work fine" in { pClient.createIndex("create_delete") blockUntilIndexExists("create_delete") @@ -66,10 +86,14 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "create_delete" should not(beCreated()) } - "Adding an alias" should "work" in { + "Adding an alias and then removing it" should "work" in { pClient.addAlias("person", "person_alias") doesAliasExists("person_alias") shouldBe true + + pClient.removeAlias("person", "person_alias") + + doesAliasExists("person_alias") shouldBe false } private def settings: Map[String, String] = { @@ -83,11 +107,41 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Toggle refresh" should "work" in { pClient.toggleRefresh("person", enable = false) - - settings.getOrElse("index.refresh_interval", "") shouldBe "-1" + new JsonParser() + .parse(pClient.loadSettings("person")) + .getAsJsonObject + .get("person") + .getAsJsonObject + .get("settings") + .getAsJsonObject + .get("index") + .getAsJsonObject + .get("refresh_interval") + .getAsString shouldBe "-1" + // settings.getOrElse("index.refresh_interval", "") shouldBe "-1" pClient.toggleRefresh("person", enable = true) - settings.getOrElse("index.refresh_interval", "") shouldBe "1s" + // settings.getOrElse("index.refresh_interval", "") shouldBe "1s" + new JsonParser() + .parse(pClient.loadSettings("person")) + .getAsJsonObject + .get("person") + .getAsJsonObject + .get("settings") + .getAsJsonObject + .get("index") + .getAsJsonObject + .get("refresh_interval") + .getAsString shouldBe "1s" + } + + "Opening an index and then closing it" should "work" in { + pClient.openIndex("person") + + isIndexOpened("person") shouldBe true + + pClient.closeIndex("person") + isIndexClosed("person") shouldBe true } "Updating number of replicas" should "work" in { @@ -98,28 +152,172 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with settings.getOrElse("index.number_of_replicas", "") shouldBe "0" } - "Opening an index and then closing it" should "work" in { - pClient.openIndex("person") + "Setting a mapping" should "work" in { + pClient.createIndex("person_mapping") + blockUntilIndexExists("person_mapping") + "person_mapping" should beCreated - isIndexOpened("person") shouldBe true + val mapping = + """{ + | "properties": { + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "name": { + | "type": "text", + | "analyzer": "ngram_analyzer", + | "search_analyzer": "search_analyzer", + | "fields": { + | "raw": { + | "type": "keyword" + | }, + | "fr": { + | "type": "text", + | "analyzer": "french" + | } + | } + | } + | } + |}""".stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.setMapping("person_mapping", mapping) shouldBe true - pClient.closeIndex("person") - isIndexClosed("person") shouldBe true + val properties = pClient.getMappingProperties("person_mapping") + logger.info(s"properties: $properties") + MappingComparator.isMappingDifferent( + properties, + mapping + ) shouldBe false + + implicit val bulkOptions: BulkOptions = BulkOptions("person_mapping", "_doc", 1000) + val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) + refresh(indices) + pClient.flush("person_mapping") + + indices should contain only "person_mapping" + + blockUntilCount(3, "person_mapping") + + "person_mapping" should haveCount(3) + + pClient.search[Person]("select * from person_mapping") match { + case r if r.size == 3 => + r.map(_.uuid) should contain allOf ("A12", "A14", "A16") + case other => fail(other.toString) + } + + pClient.search[Person]("select * from person_mapping where uuid = 'A16'") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + pClient.search[Person]("select * from person_mapping where match(name, 'gum')") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + pClient.search[Person]( + "select * from person_mapping where uuid <> 'A16' and match(name, 'gum')" + ) match { + case r if r.isEmpty => + case other => fail(other.toString) + } } - val persons: List[String] = List( - """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0} """, - """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0} """, - """ { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "childrenCount": 0} """ - ) + "Updating a mapping" should "work" in { + val mapping = + """{ + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "childrenCount": { + | "type": "integer" + | } + | } + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.updateMapping("person_migration", mapping) shouldBe true + blockUntilIndexExists("person_migration") + "person_migration" should beCreated - private val personsWithUpsert = - persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ + implicit val bulkOptions: BulkOptions = BulkOptions("person_migration", "_doc", 1000) + val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) + refresh(indices) + pClient.flush("person_migration") - val children: List[String] = List( - """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"} """, - """ { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09"} """ - ) + indices should contain only "person_migration" + + blockUntilCount(3, "person_migration") + + "person_migration" should haveCount(3) + + pClient.search[Person]("select * from person_migration where match(name, 'gum')") match { + case r if r.isEmpty => + case other => fail(other.toString) + } + + val newMapping = + """{ + | "properties": { + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "name": { + | "type": "text", + | "analyzer": "ngram_analyzer", + | "search_analyzer": "search_analyzer", + | "fields": { + | "raw": { + | "type": "keyword" + | }, + | "fr": { + | "type": "text", + | "analyzer": "french" + | } + | } + | }, + | "childrenCount": { + | "type": "integer" + | }, + | "children": { + | "type": "nested", + | "include_in_parent": true, + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "birthDate": { + | "type": "date" + | } + | } + | } + | } + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + pClient.shouldUpdateMapping("person_migration", newMapping) shouldBe true + pClient.updateMapping("person_migration", newMapping) shouldBe true + + pClient.search[Person]("select * from person_migration where match(name, 'gum')") match { + case r if r.size == 1 => + r.map(_.uuid) should contain only "A16" + case other => fail(other.toString) + } + + } "Bulk index valid json without id key and suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person1", "person", 2) @@ -155,6 +353,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with implicit val bulkOptions: BulkOptions = BulkOptions("person2", "person", 1000) val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) refresh(indices) + pClient.flush("person2") indices should contain only "person2" @@ -175,19 +374,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with _.sourceField("name") ) should contain allOf ("Homer Simpson", "Moe Szyslak", "Barney Gumble") - // FIXME elastic >= v 6.x no more multiple Parent / Child relationship allowed within the same index -// val childIndices = -// pClient.bulk[String](children.iterator, identity, None, None, None, None, None, Some("parentId"))( -// jclient, -// BulkOptions("person2", "child", 1000), -// system) -// pClient.refresh("person2") -// -// childIndices should contain only "person2" -// -// blockUntilCount(2, "person2", "child") -// -// "person2" should haveCount(5) } "Bulk index valid json with an id key and a suffix key" should "work" in { @@ -445,6 +631,11 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with val result2 = sClient.delete(sample.uuid, Some("sample"), Some("sample")) result2 shouldBe true + /*FIXME sClient.deleteAsync(sample.uuid, Some("sample")) complete () match { + case Success(r) => r shouldBe true + case Failure(f) => fail(f.getMessage) + }*/ + val result3 = sClient.get[Sample](uuid) result3.isEmpty shouldBe true } @@ -473,7 +664,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with | } |} """.stripMargin - bClient.setMapping("binaries", "test", mapping) shouldBe true + bClient.setMapping("binaries", mapping) shouldBe true for (uuid <- Seq("png", "jpg", "pdf")) { val path = Paths.get(Thread.currentThread().getContextClassLoader.getResource(s"avatar.$uuid").getPath) @@ -533,7 +724,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with |} """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") logger.info(s"mapping: $mapping") - pClient.setMapping("person10", "_doc", mapping) shouldBe true + pClient.setMapping("person10", mapping) shouldBe true implicit val bulkOptions: BulkOptions = BulkOptions("person10", "_doc", 1000) val indices = @@ -651,7 +842,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with |} """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") logger.info(s"mapping: $mapping") - parentClient.setMapping("parent", "_doc", mapping) shouldBe true + parentClient.setMapping("parent", mapping) shouldBe true implicit val bulkOptions: BulkOptions = BulkOptions("parent", "_doc", 1000) val indices = diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala index a734602f..b366eba5 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala @@ -1,9 +1,9 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.client.jest.JestProvider import app.softnetwork.elastic.model.{Binary, Parent, Sample} import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.query.JestProvider import com.typesafe.config.Config import io.searchbox.client.JestClient diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala index d1b3dee9..5efe18be 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala @@ -1,9 +1,9 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.client.rest.RestHighLevelClientProvider import app.softnetwork.elastic.model.{Binary, Parent, Sample} import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.query.RestHighLevelClientProvider import com.typesafe.config.Config import org.elasticsearch.client.RestHighLevelClient diff --git a/testkit/src/test/scala/app/softnetwork/persistence/person/JestClientPersonHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/persistence/person/JestClientPersonHandlerSpec.scala new file mode 100644 index 00000000..c768a2b5 --- /dev/null +++ b/testkit/src/test/scala/app/softnetwork/persistence/person/JestClientPersonHandlerSpec.scala @@ -0,0 +1,35 @@ +package app.softnetwork.persistence.person + +import akka.actor.typed.ActorSystem +import app.softnetwork.elastic.client.jest.JestClientApi +import app.softnetwork.elastic.persistence.query.ElasticProvider +import app.softnetwork.persistence.ManifestWrapper +import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.person.query.PersonToExternalProcessorStream +import app.softnetwork.persistence.query.{ + ExternalPersistenceProvider, + PersonToElasticProcessorStream +} +import com.typesafe.config.Config +import org.slf4j.{Logger, LoggerFactory} + +class JestClientPersonHandlerSpec extends ElasticPersonTestKit { + + override def externalPersistenceProvider: ExternalPersistenceProvider[Person] = + new ElasticProvider[Person] with JestClientApi with ManifestWrapper[Person] { + override protected val manifestWrapper: ManifestW = ManifestW() + override lazy val config: Config = JestClientPersonHandlerSpec.this.elasticConfig + } + + override def person2ExternalProcessorStream: ActorSystem[_] => PersonToExternalProcessorStream = + sys => + new PersonToElasticProcessorStream with JestClientApi { + override val forTests: Boolean = true + override protected val manifestWrapper: ManifestW = ManifestW() + override implicit def system: ActorSystem[_] = sys + override def log: Logger = LoggerFactory getLogger getClass.getName + override lazy val config: Config = JestClientPersonHandlerSpec.this.elasticConfig + } + + override def log: Logger = LoggerFactory getLogger getClass.getName +} diff --git a/testkit/src/test/scala/app/softnetwork/persistence/person/RestHighLevelClientPersonHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/persistence/person/RestHighLevelClientPersonHandlerSpec.scala new file mode 100644 index 00000000..fbce2cf8 --- /dev/null +++ b/testkit/src/test/scala/app/softnetwork/persistence/person/RestHighLevelClientPersonHandlerSpec.scala @@ -0,0 +1,35 @@ +package app.softnetwork.persistence.person + +import akka.actor.typed.ActorSystem +import app.softnetwork.elastic.client.rest.RestHighLevelClientApi +import app.softnetwork.elastic.persistence.query.ElasticProvider +import app.softnetwork.persistence.ManifestWrapper +import app.softnetwork.persistence.person.model.Person +import app.softnetwork.persistence.person.query.PersonToExternalProcessorStream +import app.softnetwork.persistence.query.{ + ExternalPersistenceProvider, + PersonToElasticProcessorStream +} +import com.typesafe.config.Config +import org.slf4j.{Logger, LoggerFactory} + +class RestHighLevelClientPersonHandlerSpec extends ElasticPersonTestKit { + + override def externalPersistenceProvider: ExternalPersistenceProvider[Person] = + new ElasticProvider[Person] with RestHighLevelClientApi with ManifestWrapper[Person] { + override protected val manifestWrapper: ManifestW = ManifestW() + override lazy val config: Config = RestHighLevelClientPersonHandlerSpec.this.elasticConfig + } + + override def person2ExternalProcessorStream: ActorSystem[_] => PersonToExternalProcessorStream = + sys => + new PersonToElasticProcessorStream with RestHighLevelClientApi { + override val forTests: Boolean = true + override protected val manifestWrapper: ManifestW = ManifestW() + override implicit def system: ActorSystem[_] = sys + override def log: Logger = LoggerFactory getLogger getClass.getName + override lazy val config: Config = RestHighLevelClientPersonHandlerSpec.this.elasticConfig + } + + override def log: Logger = LoggerFactory getLogger getClass.getName +} From 84621808f707111c2ea03c9ee7d5b905b3e91942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Mon, 28 Jul 2025 12:28:40 +0300 Subject: [PATCH 2/2] fix multiple types --- .../elastic/persistence/query/ElasticProvider.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala index 2df9100d..28fd1dc5 100644 --- a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala +++ b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala @@ -64,7 +64,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w * whether the operation is successful or not */ override def createDocument(document: T)(implicit t: ClassTag[T]): Boolean = { - Try(index(document, Some(index), Some(_type))) match { + Try(index(document, Some(index), Some("_doc"))) match { case Success(_) => refresh(index) case Failure(f) => logger.error(f.getMessage, f) @@ -85,7 +85,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w * whether the operation is successful or not */ override def updateDocument(document: T, upsert: Boolean)(implicit t: ClassTag[T]): Boolean = { - Try(update(document, Some(index), Some(_type), upsert)) match { + Try(update(document, Some(index), Some("_doc"), upsert)) match { case Success(_) => refresh(index) case Failure(f) => logger.error(f.getMessage, f) @@ -102,7 +102,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w */ override def deleteDocument(uuid: String): Boolean = { Try( - delete(uuid, index, _type) + delete(uuid, index, "_doc") ) match { case Success(value) => value && refresh(index) case Failure(f) => @@ -127,7 +127,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w Try( update( index, - _type, + "_doc", uuid, data, upsert = true @@ -148,7 +148,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w * the document retrieved, None otherwise */ override def loadDocument(uuid: String)(implicit m: Manifest[T], formats: Formats): Option[T] = { - Try(get(uuid, Some(index), Some(_type))) match { + Try(get(uuid, Some(index), Some("_doc"))) match { case Success(s) => s case Failure(f) => logger.error(f.getMessage, f)