From 05c98cf4f823498e9ad01f48c56e6b279a9b51b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Wed, 16 Jul 2025 12:39:38 +0200 Subject: [PATCH 1/4] update dependencies --- project/Versions.scala | 6 +++--- rest/build.sbt | 2 +- sql/build.sbt | 2 +- testkit/build.sbt | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/project/Versions.scala b/project/Versions.scala index 053f4293..f6eb991e 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -20,9 +20,9 @@ object Versions { val log4s = "1.8.2" - val elasticSearch = "7.17.29" + val elasticSearch = "8.18.2" - val elastic4s = "7.17.4" + val elastic4s = "8.18.2" val jest = "6.3.1" @@ -34,5 +34,5 @@ object Versions { val gson = "2.8.0" - val rest = "7.17.29" // rest high level client + val rest = "8.18.2" // rest high level client } diff --git a/rest/build.sbt b/rest/build.sbt index 834128e5..ea4e707f 100644 --- a/rest/build.sbt +++ b/rest/build.sbt @@ -12,7 +12,7 @@ val jacksonExclusions = Seq( val rest = Seq( "org.elasticsearch" % "elasticsearch" % Versions.elasticSearch exclude ("org.apache.logging.log4j", "log4j-api"), - "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % Versions.rest exclude ("org.elasticsearch", "elasticsearch"), + "co.elastic.clients" % "elasticsearch-java" % Versions.rest exclude ("org.elasticsearch", "elasticsearch"), "org.elasticsearch.client" % "elasticsearch-rest-client" % Versions.rest ).map(_.excludeAll(jacksonExclusions: _*)) diff --git a/sql/build.sbt b/sql/build.sbt index 99199fd2..960f9484 100644 --- a/sql/build.sbt +++ b/sql/build.sbt @@ -16,7 +16,7 @@ val jackson = Seq( ) val elastic4s = Seq( - "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), + "nl.gn0s1s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), ) val scalatest = Seq( diff --git a/testkit/build.sbt b/testkit/build.sbt index f9bdd649..a649ad56 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -10,9 +10,9 @@ val jacksonExclusions = Seq( ) val elastic = Seq( - "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), + "nl.gn0s1s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), "org.elasticsearch" % "elasticsearch" % Versions.elasticSearch exclude ("org.apache.logging.log4j", "log4j-api") exclude("org.slf4j", "slf4j-api") excludeAll(jacksonExclusions:_*), - "com.sksamuel.elastic4s" %% "elastic4s-testkit" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), + "nl.gn0s1s" %% "elastic4s-testkit" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), "org.apache.logging.log4j" % "log4j-api" % Versions.log4j, // "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.log4j, "org.apache.logging.log4j" % "log4j-core" % Versions.log4j, From 935702616c4ff4b80cbd9fcabe707343fac85561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Fri, 18 Jul 2025 06:39:29 +0200 Subject: [PATCH 2/4] replace high level rest client with elasticsearch java client --- build.sbt | 14 +- .../elastic/client/ElasticClientApi.scala | 79 +- {rest => java}/build.sbt | 2 +- .../client/java/ElasticsearchClientApi.scala | 691 ++++++++++++++ .../java/ElasticsearchClientCompanion.scala | 27 +- .../java/ElasticsearchClientProvider.scala | 6 +- ...asticProcessorStreamWithRestProvider.scala | 4 +- project/Versions.scala | 8 +- .../client/rest/RestHighLevelClientApi.scala | 882 ------------------ .../elastic/sql/ElasticSearchRequest.scala | 2 +- .../elastic/sql/SQLImplicits.scala | 6 - .../softnetwork/elastic/sql/SQLQuery.scala | 2 +- .../softnetwork/elastic/sql/SQLWhere.scala | 3 +- .../elastic/sql/SQLCriteriaSpec.scala | 2 +- .../elastic/sql/SQLQuerySpec.scala | 1 + .../elastic/client/MockElasticClientApi.scala | 55 -- .../scalatest/ElasticDockerTestKit.scala | 25 +- .../elastic/scalatest/ElasticTestKit.scala | 6 +- .../elastic/client/ElasticClientSpec.scala | 8 +- ...ec.scala => ElasticsearchClientSpec.scala} | 4 +- ...ers.scala => ElasticsearchProviders.scala} | 18 +- 21 files changed, 828 insertions(+), 1017 deletions(-) rename {rest => java}/build.sbt (96%) create mode 100644 java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala rename rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala => java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala (59%) rename rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala => java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala (64%) rename {rest => java}/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala (75%) delete mode 100644 rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala rename testkit/src/test/scala/app/softnetwork/elastic/client/{RestHighLevelClientSpec.scala => ElasticsearchClientSpec.scala} (83%) rename testkit/src/test/scala/app/softnetwork/elastic/client/{RestHighLevelProviders.scala => ElasticsearchProviders.scala} (60%) diff --git a/build.sbt b/build.sbt index 16f5f1fa..f76bf457 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ ThisBuild / organization := "app.softnetwork" name := "elastic" -ThisBuild / version := "7.17.29" +ThisBuild / version := Versions.elasticSearch ThisBuild / scalaVersion := "2.12.18" @@ -66,7 +66,7 @@ lazy val persistence = project.in(file("persistence")) client % "compile->compile;test->test;it->it" ) -lazy val rest = project.in(file("rest")) +lazy val java = project.in(file("java")) .configs(IntegrationTest) .settings(Defaults.itSettings) .dependsOn( @@ -75,12 +75,16 @@ lazy val rest = project.in(file("rest")) lazy val testKit = project.in(file("testkit")) .configs(IntegrationTest) - .settings(Defaults.itSettings) + .settings( + Defaults.itSettings, + app.softnetwork.Info.infoSettings + ) + .enablePlugins(BuildInfoPlugin) .dependsOn( - rest % "compile->compile;test->test;it->it" + java % "compile->compile;test->test;it->it" ) lazy val root = project.in(file(".")) .configs(IntegrationTest) .settings(Defaults.itSettings, Publish.noPublishSettings) - .aggregate(sql, client, persistence, rest, testKit) + .aggregate(sql, client, persistence, java, testKit) diff --git a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index ce5e7672..9bd5c05d 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -178,7 +178,7 @@ trait IndexApi { _: RefreshApi => def indexAsync(index: String, id: String, source: String)(implicit ec: ExecutionContext - ): Future[Boolean] + ): Future[Boolean] = Future(this.index(index, id, source)) } trait UpdateApi { _: RefreshApi => @@ -235,13 +235,11 @@ trait UpdateApi { _: RefreshApi => ) def updateAsync(index: String, indexType: String, id: String, source: String, upsert: Boolean)( implicit ec: ExecutionContext - ): Future[Boolean] = { - this.updateAsync(index, id, source, upsert) - } + ): Future[Boolean] = this.updateAsync(index, id, source, upsert) def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit ec: ExecutionContext - ): Future[Boolean] + ): Future[Boolean] = Future(this.update(index, id, source, upsert)) } trait DeleteApi { _: RefreshApi => @@ -279,7 +277,7 @@ trait DeleteApi { _: RefreshApi => def deleteAsync(uuid: String, index: String)(implicit ec: ExecutionContext - ): Future[Boolean] + ): Future[Boolean] = Future(this.delete(uuid, index)) } @@ -330,9 +328,9 @@ trait BulkApi { _: RefreshApi with SettingsApi => * | balance | | bulk | * | |------->| | * +----------+ +----------+ - * | | - * | | - * | | + * | | + * | | + * | | * +---------+ | | * | |<-----------' | * | merge | | @@ -502,7 +500,11 @@ trait BulkApi { _: RefreshApi with SettingsApi => } trait CountApi { - def countAsync(query: JSONQuery)(implicit ec: ExecutionContext): Future[Option[Double]] + def countAsync(query: JSONQuery)(implicit ec: ExecutionContext): Future[Option[Double]] = { + Future( + this.count(query) + ) + } def count(query: JSONQuery): Option[Double] @@ -527,7 +529,9 @@ trait GetApi { id: String, index: Option[String] = None, maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] + )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = { + Future(this.get[U](id, index, maybeType)) + } } trait SearchApi { @@ -538,13 +542,26 @@ trait SearchApi { def searchAsync[U]( sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] + )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = Future( + this.search[U](sqlQuery) + ) def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], formats: Formats - ): List[(U, List[I])] + ): List[(U, List[I])] = { + sqlQuery.search match { + case Some(searchRequest) => + val indices = collection.immutable.Seq(searchRequest.sources: _*) + val jsonQuery = JSONQuery(searchRequest.query, indices) + searchWithInnerHits(jsonQuery, innerField) + case None => + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + } + } def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit m1: Manifest[U], @@ -554,7 +571,23 @@ trait SearchApi { def multiSearch[U]( sqlQuery: SQLQuery - )(implicit m: Manifest[U], formats: Formats): List[List[U]] + )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { + sqlQuery.multiSearch match { + case Some(multiSearchRequest) => + val jsonQueries: JSONQueries = JSONQueries( + collection.immutable + .Seq(multiSearchRequest.requests.map { searchRequest => + JSONQuery(searchRequest.query, collection.immutable.Seq(searchRequest.sources: _*)) + }: _*) + .toList + ) + multiSearch[U](jsonQueries) + case None => + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + } + } def multiSearch[U]( jsonQueries: JSONQueries @@ -564,7 +597,23 @@ trait SearchApi { m1: Manifest[U], m2: Manifest[I], formats: Formats - ): List[List[(U, List[I])]] + ): List[List[(U, List[I])]] = { + sqlQuery.multiSearch match { + case Some(multiSearchRequest) => + val jsonQueries: JSONQueries = JSONQueries( + collection.immutable + .Seq(multiSearchRequest.requests.map { searchRequest => + JSONQuery(searchRequest.query, collection.immutable.Seq(searchRequest.sources: _*)) + }: _*) + .toList + ) + multiSearchWithInnerHits[U, I](jsonQueries, innerField) + case None => + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + } + } def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit m1: Manifest[U], diff --git a/rest/build.sbt b/java/build.sbt similarity index 96% rename from rest/build.sbt rename to java/build.sbt index ea4e707f..885b083d 100644 --- a/rest/build.sbt +++ b/java/build.sbt @@ -1,6 +1,6 @@ organization := "app.softnetwork.elastic" -name := "elastic-rest-client" +name := "elastic-java-client" val jacksonExclusions = Seq( ExclusionRule(organization = "com.fasterxml.jackson.core"), diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala new file mode 100644 index 00000000..59425aae --- /dev/null +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala @@ -0,0 +1,691 @@ +package app.softnetwork.elastic.client.java + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import app.softnetwork.elastic.client._ +import app.softnetwork.elastic.sql.SQLQuery +import app.softnetwork.elastic.{client, sql} +import app.softnetwork.persistence.model.Timestamped +import app.softnetwork.serialization.serialization +import co.elastic.clients.elasticsearch.core.bulk.{ + BulkOperation, + BulkResponseItem, + DeleteOperation, + IndexOperation, + UpdateAction, + UpdateOperation +} +import co.elastic.clients.elasticsearch.core.msearch.{ + MultisearchBody, + MultisearchHeader, + RequestItem +} +import co.elastic.clients.elasticsearch.core.{ + BulkRequest, + BulkResponse, + CountRequest, + DeleteRequest, + GetRequest, + IndexRequest, + MsearchRequest, + SearchRequest, + UpdateRequest +} +import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction} +import co.elastic.clients.elasticsearch.indices.{ + CloseIndexRequest, + CreateIndexRequest, + DeleteIndexRequest, + FlushRequest, + GetIndicesSettingsRequest, + GetMappingRequest, + IndexSettings, + OpenRequest, + PutIndicesSettingsRequest, + PutMappingRequest, + RefreshRequest, + UpdateAliasesRequest +} +import com.google.gson.JsonParser + +import _root_.java.io.StringReader +import _root_.java.util.{Map => JMap} + +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, seqAsJavaListConverter} +import org.json4s.Formats + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.language.implicitConversions +import scala.util.{Failure, Success, Try} + +trait ElasticsearchClientApi + extends ElasticClientApi + with ElasticsearchClientIndicesApi + with ElasticsearchClientAliasApi + with ElasticsearchClientSettingsApi + with ElasticsearchClientMappingApi + with ElasticsearchClientRefreshApi + with ElasticsearchClientFlushApi + with ElasticsearchClientCountApi + with ElasticsearchClientSingleValueAggregateApi + with ElasticsearchClientIndexApi + with ElasticsearchClientUpdateApi + with ElasticsearchClientDeleteApi + with ElasticsearchClientGetApi + with ElasticsearchClientSearchApi + with ElasticsearchClientBulkApi + +trait ElasticsearchClientIndicesApi extends IndicesApi with ElasticsearchClientCompanion { + override def createIndex(index: String, settings: String): Boolean = { + apply() + .indices() + .create( + new CreateIndexRequest.Builder() + .index(index) + .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) + .build() + ) + .acknowledged() + } + + override def deleteIndex(index: String): Boolean = { + apply().indices().delete(new DeleteIndexRequest.Builder().index(index).build()).acknowledged() + } + + override def openIndex(index: String): Boolean = { + apply().indices().open(new OpenRequest.Builder().index(index).build()).acknowledged() + } + + override def closeIndex(index: String): Boolean = { + apply().indices().close(new CloseIndexRequest.Builder().index(index).build()).acknowledged() + } + +} + +trait ElasticsearchClientAliasApi extends AliasApi with ElasticsearchClientCompanion { + override def addAlias(index: String, alias: String): Boolean = { + apply() + .indices() + .updateAliases( + new UpdateAliasesRequest.Builder() + .actions( + new Action.Builder() + .add(new AddAction.Builder().index(index).alias(alias).build()) + .build() + ) + .build() + ) + .acknowledged() + } + + override def removeAlias(index: String, alias: String): Boolean = { + apply() + .indices() + .updateAliases( + new UpdateAliasesRequest.Builder() + .actions( + new Action.Builder() + .remove(new RemoveAction.Builder().index(index).alias(alias).build()) + .build() + ) + .build() + ) + .acknowledged() + } +} + +trait ElasticsearchClientSettingsApi extends SettingsApi with ElasticsearchClientCompanion { + _: ElasticsearchClientIndicesApi => + + override def updateSettings(index: String, settings: String): Boolean = { + apply() + .indices() + .putSettings( + new PutIndicesSettingsRequest.Builder() + .index(index) + .settings(new IndexSettings.Builder().withJson(new StringReader(settings)).build()) + .build() + ) + .acknowledged() + } + + override def loadSettings(): String = { + apply() + .indices() + .getSettings( + new GetIndicesSettingsRequest.Builder().index("*").build() + ) + .toString + } +} + +trait ElasticsearchClientMappingApi extends MappingApi with ElasticsearchClientCompanion { + override def setMapping(index: String, mapping: String): Boolean = { + apply() + .indices() + .putMapping( + new PutMappingRequest.Builder().index(index).withJson(new StringReader(mapping)).build() + ) + .acknowledged() + } + + override def getMapping(index: String): String = { + apply() + .indices() + .getMapping( + new GetMappingRequest.Builder().index(index).build() + ) + .toString + } +} + +trait ElasticsearchClientRefreshApi extends RefreshApi with ElasticsearchClientCompanion { + override def refresh(index: String): Boolean = { + apply() + .indices() + .refresh( + new RefreshRequest.Builder().index(index).build() + ) + .shards() + .failed() + .intValue() == 0 + } +} + +trait ElasticsearchClientFlushApi extends FlushApi with ElasticsearchClientCompanion { + override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = { + apply() + .indices() + .flush( + new FlushRequest.Builder().index(index).force(force).waitIfOngoing(wait).build() + ) + .shards() + .failed() + .intValue() == 0 + } +} + +trait ElasticsearchClientCountApi extends CountApi with ElasticsearchClientCompanion { + override def count(query: client.JSONQuery): Option[Double] = { + Option( + apply() + .count( + new CountRequest.Builder().index(query.indices.asJava).build() + ) + .count() + .toDouble + ) + } +} + +trait ElasticsearchClientSingleValueAggregateApi + extends SingleValueAggregateApi + with ElasticsearchClientCountApi { + override def aggregate( + sqlQuery: SQLQuery + )(implicit ec: ExecutionContext): Future[Seq[SingleValueAggregateResult]] = { + val futures = for (aggregation <- sqlQuery.aggregations) yield { + val promise: Promise[SingleValueAggregateResult] = Promise() + val field = aggregation.field + val sourceField = aggregation.sourceField + val aggType = aggregation.aggType + val aggName = aggregation.aggName + val query = aggregation.query.getOrElse("") + val sources = aggregation.sources + sourceField match { + case "_id" if aggType.sql == "count" => + countAsync( + JSONQuery( + query, + collection.immutable.Seq(sources: _*), + collection.immutable.Seq.empty[String] + ) + ).onComplete { + case Success(result) => + promise.success( + SingleValueAggregateResult( + field, + aggType, + result.getOrElse(0d), + None + ) + ) + case Failure(f) => + logger.error(f.getMessage, f.fillInStackTrace()) + promise.success(SingleValueAggregateResult(field, aggType, 0d, Some(f.getMessage))) + } + promise.future + case _ => + val jsonQuery = JSONQuery( + query, + collection.immutable.Seq(sources: _*), + collection.immutable.Seq.empty[String] + ) + import jsonQuery._ + // Create a parser for the query + Try( + apply().search( + new SearchRequest.Builder() + .index(indices.asJava) + .withJson( + new StringReader(jsonQuery.query) + ) + .build() + ) + ) match { + case Success(response) => + val agg = aggName.split("\\.").last + + val itAgg = aggName.split("\\.").iterator + + var root = + if (aggregation.nested) { + response.aggregations().get(itAgg.next()).nested().aggregations() + } else { + response.aggregations() + } + + if (aggregation.filtered) { + root = root.get(itAgg.next()).filter().aggregations() + } + + promise.success( + SingleValueAggregateResult( + field, + aggType, + aggType match { + case sql.Count => + if (aggregation.distinct) { + root.get(agg).cardinality().value() + } else { + root.get(agg).valueCount().value() + } + case sql.Sum => + root.get(agg).sum().value() + case sql.Avg => + root.get(agg).avg().value() + case sql.Min => + root.get(agg).min().value() + case sql.Max => + root.get(agg).max().value() + case _ => 0d + }, + None + ) + ) + case Failure(exception) => + logger.error(s"Failed to execute search for aggregation: $aggName", exception) + promise.success( + SingleValueAggregateResult( + field, + aggType, + 0d, + Some(exception.getMessage) + ) + ) + } + promise.future + } + } + Future.sequence(futures) + } +} + +trait ElasticsearchClientIndexApi extends IndexApi with ElasticsearchClientCompanion { + _: ElasticsearchClientRefreshApi => + override def index(index: String, id: String, source: String): Boolean = { + apply() + .index( + new IndexRequest.Builder() + .index(index) + .id(id) + .withJson(new StringReader(source)) + .build() + ) + .shards() + .failed() + .intValue() == 0 + } + +} + +trait ElasticsearchClientUpdateApi extends UpdateApi with ElasticsearchClientCompanion { + _: ElasticsearchClientRefreshApi => + override def update( + index: String, + id: String, + source: String, + upsert: Boolean + ): Boolean = { + apply() + .update( + new UpdateRequest.Builder[JMap[String, Object], JMap[String, Object]]() + .index(index) + .id(id) + .doc(mapper.readValue(source, classOf[JMap[String, Object]])) + .docAsUpsert(upsert) + .build(), + classOf[JMap[String, Object]] + ) + .shards() + .failed() + .intValue() == 0 + } + +} + +trait ElasticsearchClientDeleteApi extends DeleteApi with ElasticsearchClientCompanion { + _: ElasticsearchClientRefreshApi => + + override def delete(uuid: String, index: String): Boolean = { + apply() + .delete( + new DeleteRequest.Builder().index(index).id(uuid).build() + ) + .shards() + .failed() + .intValue() == 0 + } + +} + +trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion { + + def get[U <: Timestamped]( + id: String, + index: Option[String] = None, + maybeType: Option[String] = None + )(implicit m: Manifest[U], formats: Formats): Option[U] = { + Try( + apply().get( + new GetRequest.Builder() + .index( + index.getOrElse( + maybeType.getOrElse( + m.runtimeClass.getSimpleName.toLowerCase + ) + ) + ) + .id(id) + .build(), + classOf[JMap[String, Object]] + ) + ) match { + case Success(response) => + if (response.found()) { + val source = mapper.writeValueAsString(response.source()) + logger.info(s"Deserializing response $source for id: $id, index: ${index + .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") + // Deserialize the source string to the expected type + // Note: This assumes that the source is a valid JSON representation of U + // and that the serialization library is capable of handling it. + Try(serialization.read[U](source)) match { + case Success(value) => Some(value) + case Failure(f) => + logger.error( + s"Failed to deserialize response $source for id: $id, index: ${index + .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", + f + ) + None + } + } else { + None + } + case Failure(f) => + logger.error( + s"Failed to get document with id: $id, index: ${index + .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", + f + ) + None + } + } + +} + +trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCompanion { + override def search[U]( + jsonQuery: JSONQuery + )(implicit m: Manifest[U], formats: Formats): List[U] = { + import jsonQuery._ + logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") + val response = apply().search( + new SearchRequest.Builder() + .index(indices.asJava) + .withJson( + new StringReader(query) + ) + .build(), + classOf[JMap[String, Object]] + ) + if (response.hits().total().value() > 0) { + response + .hits() + .hits() + .asScala + .map { hit => + val source = mapper.writeValueAsString(hit.source()) + logger.info(s"Deserializing hit: $source") + serialization.read[U](source) + } + .toList + } else { + List.empty[U] + } + } + + override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { + sqlQuery.search match { + case Some(searchRequest) => + val indices = collection.immutable.Seq(searchRequest.sources: _*) + search[U](JSONQuery(searchRequest.query, indices)) + case None => + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + } + } + + override def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit + m1: Manifest[U], + m2: Manifest[I], + formats: Formats + ): List[(U, List[I])] = { + import jsonQuery._ + val response = apply().search( + new SearchRequest.Builder() + .index(indices.asJava) + .withJson( + new StringReader(query) + ) + .build(), + classOf[JMap[String, Object]] + ) + Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match { + case Success(s) => s + case Failure(f) => + logger.error(f.getMessage, f) + List.empty + } + } + + override def multiSearch[U]( + jsonQueries: JSONQueries + )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { + import jsonQueries._ + + val items = queries.map { query => + new RequestItem.Builder() + .header(new MultisearchHeader.Builder().index(query.indices.asJava).build()) + .body(new MultisearchBody.Builder().withJson(new StringReader(query.query)).build()) + .build() + } + + val request = new MsearchRequest.Builder().searches(items.asJava).build() + val responses = apply().msearch(request, classOf[JMap[String, Object]]) + + responses.responses().asScala.toList.map { + case response if response.isFailure => + logger.error(s"Error in multi search: ${response.failure().error().reason()}") + List.empty[U] + + case response => + response + .result() + .hits() + .hits() + .asScala + .toList + .map(hit => serialization.read[U](mapper.writeValueAsString(hit.source()))) + } + } + + override def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit + m1: Manifest[U], + m2: Manifest[I], + formats: Formats + ): List[List[(U, List[I])]] = { + import jsonQueries._ + val items = queries.map { query => + new RequestItem.Builder() + .header(new MultisearchHeader.Builder().index(query.indices.asJava).build()) + .body(new MultisearchBody.Builder().withJson(new StringReader(query.query)).build()) + .build() + } + + val request = new MsearchRequest.Builder().searches(items.asJava).build() + val responses = apply().msearch(request, classOf[JMap[String, Object]]) + + responses.responses().asScala.toList.map { + case response if response.isFailure => + logger.error(s"Error in multi search: ${response.failure().error().reason()}") + List.empty[(U, List[I])] + + case response => + Try( + new JsonParser().parse(response.result().toString).getAsJsonObject ~> [U, I] innerField + ) match { + case Success(s) => s + case Failure(f) => + logger.error(f.getMessage, f) + List.empty + } + } + } + +} + +trait ElasticsearchClientBulkApi + extends ElasticsearchClientRefreshApi + with ElasticsearchClientSettingsApi + with ElasticsearchClientIndicesApi + with BulkApi { + override type A = BulkOperation + override type R = BulkResponse + + override def toBulkAction(bulkItem: BulkItem): A = { + import bulkItem._ + + action match { + case BulkAction.UPDATE => + new BulkOperation.Builder() + .update( + new UpdateOperation.Builder() + .index(index) + .id(id.orNull) + .action( + new UpdateAction.Builder[JMap[String, Object], JMap[String, Object]]() + .doc(mapper.readValue(body, classOf[JMap[String, Object]])) + .docAsUpsert(true) + .build() + ) + .build() + ) + .build() + + case BulkAction.DELETE => + val deleteId = id.getOrElse { + throw new IllegalArgumentException(s"Missing id for delete on index $index") + } + new BulkOperation.Builder() + .delete(new DeleteOperation.Builder().index(index).id(deleteId).build()) + .build() + + case _ => + new BulkOperation.Builder() + .index( + new IndexOperation.Builder[JMap[String, Object]]() + .index(index) + .id(id.orNull) + .document(mapper.readValue(body, classOf[JMap[String, Object]])) + .build() + ) + .build() + } + } + override def bulkResult: Flow[R, Set[String], NotUsed] = + Flow[BulkResponse] + .named("result") + .map(result => { + val items = result.items().asScala.toList + val grouped = items.groupBy(_.index()) + val indices = grouped.keys.toSet + for (index <- indices) { + logger + .info(s"Bulk operation succeeded for index $index with ${grouped(index).length} items.") + } + indices + }) + + override def bulk(implicit + bulkOptions: BulkOptions, + system: ActorSystem + ): Flow[Seq[A], R, NotUsed] = { + val parallelism = Math.max(1, bulkOptions.balance) + Flow[Seq[A]] + .named("bulk") + .mapAsyncUnordered[R](parallelism) { items => + val request = + new BulkRequest.Builder().index(bulkOptions.index).operations(items.asJava).build() + Try(apply().bulk(request)) match { + case Success(response) if response.errors() => + val failedItems = response.items().asScala.filter(_.status() >= 400) + if (failedItems.nonEmpty) { + val errorMessages = + failedItems.map(i => s"${i.id()} - ${i.error().reason()}").mkString(", ") + Future.failed(new Exception(s"Bulk operation failed for items: $errorMessages")) + } else { + Future.successful(response) + } + case Success(response) => + Future.successful(response) + case Failure(exception) => + logger.error("Bulk operation failed", exception) + Future.failed(exception) + } + } + } + + private[this] def toBulkElasticResultItem(i: BulkResponseItem): BulkElasticResultItem = + new BulkElasticResultItem { + override def index: String = i.index() + } + + override implicit def toBulkElasticAction(a: BulkOperation): BulkElasticAction = + new BulkElasticAction { + override def index: String = + if (a.isIndex) a.index().index() + else if (a.isDelete) a.delete().index() + else if (a.isUpdate) a.update().index() + else throw new IllegalArgumentException("Unsupported bulk action type") + } + + override implicit def toBulkElasticResult(r: BulkResponse): BulkElasticResult = { + new BulkElasticResult { + override def items: List[BulkElasticResultItem] = + r.items().asScala.toList.map(toBulkElasticResultItem) + } + } +} diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala similarity index 59% rename from rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala rename to java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala index 1ec39335..4ae8a5f8 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala @@ -1,30 +1,26 @@ -package app.softnetwork.elastic.client.rest +package app.softnetwork.elastic.client.java import app.softnetwork.elastic.client.ElasticConfig +import co.elastic.clients.elasticsearch.ElasticsearchClient +import co.elastic.clients.json.jackson.JacksonJsonpMapper +import co.elastic.clients.transport.rest_client.RestClientTransport +import com.fasterxml.jackson.databind.ObjectMapper import com.typesafe.scalalogging.StrictLogging import org.apache.http.HttpHost import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder -import org.elasticsearch.client.{RestClient, RestClientBuilder, RestHighLevelClient} -import org.elasticsearch.common.settings.Settings -import org.elasticsearch.xcontent.NamedXContentRegistry -import org.elasticsearch.plugins.SearchPlugin -import org.elasticsearch.search.SearchModule +import org.elasticsearch.client.{RestClient, RestClientBuilder} -trait RestHighLevelClientCompanion extends StrictLogging { +trait ElasticsearchClientCompanion extends StrictLogging { def elasticConfig: ElasticConfig - private var client: Option[RestHighLevelClient] = None + private var client: Option[ElasticsearchClient] = None - lazy val namedXContentRegistry: NamedXContentRegistry = { - import scala.collection.JavaConverters._ - val searchModule = new SearchModule(Settings.EMPTY, false, List.empty[SearchPlugin].asJava) - new NamedXContentRegistry(searchModule.getNamedXContents) - } + lazy val mapper = new ObjectMapper() - def apply(): RestHighLevelClient = { + def apply(): ElasticsearchClient = { client match { case Some(c) => c case _ => @@ -45,7 +41,8 @@ trait RestHighLevelClientCompanion extends StrictLogging { .setHttpClientConfigCallback((httpAsyncClientBuilder: HttpAsyncClientBuilder) => httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) ) - val c = new RestHighLevelClient(restClientBuilder) + val transport = new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper()) + val c = new ElasticsearchClient(transport) client = Some(c) c } diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala similarity index 64% rename from rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala rename to java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala index 9a5b40f5..2b25e2d4 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientProvider.scala @@ -1,12 +1,12 @@ -package app.softnetwork.elastic.client.rest +package app.softnetwork.elastic.client.java import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.model.Timestamped -trait RestHighLevelClientProvider[T <: Timestamped] +trait ElasticsearchClientProvider[T <: Timestamped] extends ElasticProvider[T] - with RestHighLevelClientApi { + with ElasticsearchClientApi { _: ManifestWrapper[T] => } diff --git a/rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala b/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala similarity index 75% rename from rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala rename to java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala index 0b7df127..6907b441 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala +++ b/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala @@ -1,10 +1,10 @@ package app.softnetwork.elastic.persistence.query -import app.softnetwork.elastic.client.rest.RestHighLevelClientProvider +import app.softnetwork.elastic.client.java.ElasticsearchClientProvider import app.softnetwork.persistence.message.CrudEvent import app.softnetwork.persistence.model.Timestamped import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} trait State2ElasticProcessorStreamWithRestProvider[T <: Timestamped, E <: CrudEvent] extends State2ElasticProcessorStream[T, E] - with RestHighLevelClientProvider[T] { _: JournalProvider with OffsetProvider => } + with ElasticsearchClientProvider[T] { _: JournalProvider with OffsetProvider => } diff --git a/project/Versions.scala b/project/Versions.scala index f6eb991e..0619d822 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -2,13 +2,13 @@ object Versions { // val akka = "2.6.20" - val scalatest = "3.2.16" + val scalatest = "3.2.19" val typesafeConfig = "1.4.2" val kxbmap = "0.4.4" - val jackson = "2.13.3" // 2.11.4 -> 2.12.7 + val jackson = "2.19.0" // 2.13.3 -> 2.19.0 val json4s = "4.0.6" // 3.6.12 -> 4.0.6 @@ -20,7 +20,7 @@ object Versions { val log4s = "1.8.2" - val elasticSearch = "8.18.2" + val elasticSearch = "8.18.3" val elastic4s = "8.18.2" @@ -34,5 +34,5 @@ object Versions { val gson = "2.8.0" - val rest = "8.18.2" // rest high level client + val rest = "8.18.3" // java client } diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala deleted file mode 100644 index 47a2349f..00000000 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ /dev/null @@ -1,882 +0,0 @@ -package app.softnetwork.elastic.client.rest - -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import app.softnetwork.elastic.client._ -import app.softnetwork.elastic.sql.SQLQuery -import app.softnetwork.elastic.{client, sql} -import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.serialization.serialization -import com.google.gson.JsonParser -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest -import org.elasticsearch.action.admin.indices.flush.FlushRequest -import org.elasticsearch.action.admin.indices.open.OpenIndexRequest -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest -import org.elasticsearch.action.bulk.{BulkItemResponse, BulkRequest, BulkResponse} -import org.elasticsearch.action.delete.{DeleteRequest, DeleteResponse} -import org.elasticsearch.action.get.{GetRequest, GetResponse} -import org.elasticsearch.action.index.{IndexRequest, IndexResponse} -import org.elasticsearch.action.search.{MultiSearchRequest, SearchRequest, SearchResponse} -import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse} -import org.elasticsearch.action.{ActionListener, DocWriteRequest} -import org.elasticsearch.client.RequestOptions -import org.elasticsearch.client.core.{CountRequest, CountResponse} -import org.elasticsearch.client.indices.{ - CloseIndexRequest, - CreateIndexRequest, - GetMappingsRequest, - PutMappingRequest -} -import org.elasticsearch.common.io.stream.InputStreamStreamInput -import org.elasticsearch.xcontent.{DeprecationHandler, XContentType} -import org.elasticsearch.rest.RestStatus -import org.elasticsearch.search.aggregations.bucket.filter.Filter -import org.elasticsearch.search.aggregations.bucket.nested.Nested -import org.elasticsearch.search.aggregations.metrics.{Avg, Cardinality, Max, Min, Sum, ValueCount} -import org.elasticsearch.search.builder.SearchSourceBuilder -import org.json4s.Formats - -import java.io.ByteArrayInputStream -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} - -trait RestHighLevelClientApi - extends ElasticClientApi - with RestHighLevelClientIndicesApi - with RestHighLevelClientAliasApi - with RestHighLevelClientSettingsApi - with RestHighLevelClientMappingApi - with RestHighLevelClientRefreshApi - with RestHighLevelClientFlushApi - with RestHighLevelClientCountApi - with RestHighLevelClientSingleValueAggregateApi - with RestHighLevelClientIndexApi - with RestHighLevelClientUpdateApi - with RestHighLevelClientDeleteApi - with RestHighLevelClientGetApi - with RestHighLevelClientSearchApi - with RestHighLevelClientBulkApi - -trait RestHighLevelClientIndicesApi extends IndicesApi with RestHighLevelClientCompanion { - override def createIndex(index: String, settings: String): Boolean = { - apply() - .indices() - .create( - new CreateIndexRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def deleteIndex(index: String): Boolean = { - apply().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - - override def openIndex(index: String): Boolean = { - apply().indices().open(new OpenIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - - override def closeIndex(index: String): Boolean = { - apply().indices().close(new CloseIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - -} - -trait RestHighLevelClientAliasApi extends AliasApi with RestHighLevelClientCompanion { - override def addAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.ADD) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def removeAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.REMOVE) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged - } -} - -trait RestHighLevelClientSettingsApi extends SettingsApi with RestHighLevelClientCompanion { - _: RestHighLevelClientIndicesApi => - - override def updateSettings(index: String, settings: String): Boolean = { - apply() - .indices() - .putSettings( - new UpdateSettingsRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def loadSettings(): String = { - apply() - .indices() - .getSettings( - new GetSettingsRequest().indices("*"), - RequestOptions.DEFAULT - ) - .toString - } -} - -trait RestHighLevelClientMappingApi extends MappingApi with RestHighLevelClientCompanion { - override def setMapping(index: String, mapping: String): Boolean = { - apply() - .indices() - .putMapping( - new PutMappingRequest(index) - .source(mapping, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def getMapping(index: String): String = { - apply() - .indices() - .getMapping( - new GetMappingsRequest().indices(index), - RequestOptions.DEFAULT - ) - .toString - } -} - -trait RestHighLevelClientRefreshApi extends RefreshApi with RestHighLevelClientCompanion { - override def refresh(index: String): Boolean = { - apply() - .indices() - .refresh( - new RefreshRequest(index), - RequestOptions.DEFAULT - ) - .getStatus - .getStatus < 400 - } -} - -trait RestHighLevelClientFlushApi extends FlushApi with RestHighLevelClientCompanion { - override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = { - apply() - .indices() - .flush( - new FlushRequest(index).force(force).waitIfOngoing(wait), - RequestOptions.DEFAULT - ) - .getStatus == RestStatus.OK - } -} - -trait RestHighLevelClientCountApi extends CountApi with RestHighLevelClientCompanion { - override def countAsync( - query: client.JSONQuery - )(implicit ec: ExecutionContext): Future[Option[Double]] = { - val promise = Promise[Option[Double]]() - apply().countAsync( - new CountRequest().indices(query.indices: _*).types(query.types: _*), - RequestOptions.DEFAULT, - new ActionListener[CountResponse] { - override def onResponse(response: CountResponse): Unit = - promise.success(Option(response.getCount.toDouble)) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } - - override def count(query: client.JSONQuery): Option[Double] = { - Option( - apply() - .count( - new CountRequest().indices(query.indices: _*).types(query.types: _*), - RequestOptions.DEFAULT - ) - .getCount - .toDouble - ) - } -} - -trait RestHighLevelClientSingleValueAggregateApi - extends SingleValueAggregateApi - with RestHighLevelClientCountApi { - override def aggregate( - sqlQuery: SQLQuery - )(implicit ec: ExecutionContext): Future[Seq[SingleValueAggregateResult]] = { - val futures = for (aggregation <- sqlQuery.aggregations) yield { - val promise: Promise[SingleValueAggregateResult] = Promise() - val field = aggregation.field - val sourceField = aggregation.sourceField - val aggType = aggregation.aggType - val aggName = aggregation.aggName - val query = aggregation.query.getOrElse("") - val sources = aggregation.sources - sourceField match { - case "_id" if aggType.sql == "count" => - countAsync( - JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ) - ).onComplete { - case Success(result) => - promise.success( - SingleValueAggregateResult( - field, - aggType, - result.getOrElse(0d), - None - ) - ) - case Failure(f) => - logger.error(f.getMessage, f.fillInStackTrace()) - promise.success(SingleValueAggregateResult(field, aggType, 0d, Some(f.getMessage))) - } - promise.future - case _ => - val jsonQuery = JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ) - import jsonQuery._ - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - jsonQuery.query - ) - apply().searchAsync( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT, - new ActionListener[SearchResponse] { - override def onResponse(response: SearchResponse): Unit = { - val agg = aggName.split("\\.").last - - val itAgg = aggName.split("\\.").iterator - - var root = - if (aggregation.nested) { - response.getAggregations.get(itAgg.next()).asInstanceOf[Nested].getAggregations - } else { - response.getAggregations - } - - if (aggregation.filtered) { - root = root.get(itAgg.next()).asInstanceOf[Filter].getAggregations - } - - promise.success( - SingleValueAggregateResult( - field, - aggType, - aggType match { - case sql.Count => - if (aggregation.distinct) { - root.get(agg).asInstanceOf[Cardinality].value() - } else { - root.get(agg).asInstanceOf[ValueCount].value() - } - case sql.Sum => - root.get(agg).asInstanceOf[Sum].value() - case sql.Avg => - root.get(agg).asInstanceOf[Avg].value() - case sql.Min => - root.get(agg).asInstanceOf[Min].value() - case sql.Max => - root.get(agg).asInstanceOf[Max].value() - case _ => 0d - }, - None - ) - ) - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } - } - Future.sequence(futures) - } -} - -trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - override def index(index: String, id: String, source: String): Boolean = { - apply() - .index( - new IndexRequest(index) - .id(id) - .source(source, XContentType.JSON), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def indexAsync(index: String, id: String, source: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().indexAsync( - new IndexRequest(index) - .id(id) - .source(source, XContentType.JSON), - RequestOptions.DEFAULT, - new ActionListener[IndexResponse] { - override def onResponse(response: IndexResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - override def update( - index: String, - id: String, - source: String, - upsert: Boolean - ): Boolean = { - apply() - .update( - new UpdateRequest(index, id) - .doc(source, XContentType.JSON) - .docAsUpsert(upsert), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def updateAsync( - index: String, - id: String, - source: String, - upsert: Boolean - )(implicit ec: ExecutionContext): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().updateAsync( - new UpdateRequest(index, id) - .doc(source, XContentType.JSON) - .docAsUpsert(upsert), - RequestOptions.DEFAULT, - new ActionListener[UpdateResponse] { - override def onResponse(response: UpdateResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - - override def delete(uuid: String, index: String): Boolean = { - apply() - .delete( - new DeleteRequest(index, uuid), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def deleteAsync(uuid: String, index: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().deleteAsync( - new DeleteRequest(index, uuid), - RequestOptions.DEFAULT, - new ActionListener[DeleteResponse] { - override def onResponse(response: DeleteResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientGetApi extends GetApi with RestHighLevelClientCompanion { - def get[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], formats: Formats): Option[U] = { - Try( - apply().get( - new GetRequest( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ), - RequestOptions.DEFAULT - ) - ) match { - case Success(response) => - if (response.isExists) { - val source = response.getSourceAsString - logger.info(s"Deserializing response $source for id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") - // Deserialize the source string to the expected type - // Note: This assumes that the source is a valid JSON representation of U - // and that the serialization library is capable of handling it. - Try(serialization.read[U](source)) match { - case Success(value) => Some(value) - case Failure(f) => - logger.error( - s"Failed to deserialize response $source for id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", - f - ) - None - } - } else { - None - } - case Failure(f) => - logger.error( - s"Failed to get document with id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", - f - ) - None - } - } - - override def getAsync[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = { - val promise = Promise[Option[U]]() - apply().getAsync( - new GetRequest( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ), - RequestOptions.DEFAULT, - new ActionListener[GetResponse] { - override def onResponse(response: GetResponse): Unit = { - if (response.isExists) { - promise.success(Some(serialization.read[U](response.getSourceAsString))) - } else { - promise.success(None) - } - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientSearchApi extends SearchApi with RestHighLevelClientCompanion { - override def search[U]( - jsonQuery: JSONQuery - )(implicit m: Manifest[U], formats: Formats): List[U] = { - import jsonQuery._ - logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - query - ) - val response = apply().search( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT - ) - if (response.getHits.getTotalHits.value > 0) { - response.getHits.getHits.toList.map { hit => - logger.info(s"Deserializing hit: ${hit.getSourceAsString}") - serialization.read[U](hit.getSourceAsString) - } - } else { - List.empty[U] - } - } - - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - search[U](JSONQuery(searchRequest.query, indices)) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - - override def searchAsync[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = { - val promise = Promise[List[U]]() - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - val jsonQuery = JSONQuery(searchRequest.query, indices) - import jsonQuery._ - logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - query - ) - // Execute the search asynchronously - apply().searchAsync( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT, - new ActionListener[SearchResponse] { - override def onResponse(response: SearchResponse): Unit = { - if (response.getHits.getTotalHits.value > 0) { - promise.success(response.getHits.getHits.toList.map { hit => - serialization.read[U](hit.getSourceAsString) - }) - } else { - promise.success(List.empty[U]) - } - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - case None => - promise.failure( - new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - ) - } - promise.future - } - - override def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = { - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - val jsonQuery = JSONQuery(searchRequest.query, indices) - searchWithInnerHits(jsonQuery, innerField) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - - override def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = { - import jsonQuery._ - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - jsonQuery.query - ) - val response = apply().search( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT - ) - Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - } - - override def multiSearch[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - sqlQuery.multiSearch match { - case Some(multiSearchRequest) => - val jsonQueries: JSONQueries = JSONQueries( - collection.immutable - .Seq(multiSearchRequest.requests.map { searchRequest => - JSONQuery(searchRequest.query, collection.immutable.Seq(searchRequest.sources: _*)) - }: _*) - .toList - ) - multiSearch[U](jsonQueries) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - - override def multiSearch[U]( - jsonQueries: JSONQueries - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - import jsonQueries._ - val request = new MultiSearchRequest() - for (query <- queries) { - request.add( - new SearchRequest(query.indices: _*) - .types(query.types: _*) - .source( - new SearchSourceBuilder( - new InputStreamStreamInput( - new ByteArrayInputStream( - query.query.getBytes() - ) - ) - ) - ) - ) - } - val responses = apply().msearch(request, RequestOptions.DEFAULT) - responses.getResponses.toList.map { response => - if (response.isFailure) { - logger.error(s"Error in multi search: ${response.getFailureMessage}") - List.empty[U] - } else { - response.getResponse.getHits.getHits.toList.map { hit => - serialization.read[U](hit.getSourceAsString) - } - } - } - } - - override def multiSearchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = { - sqlQuery.multiSearch match { - case Some(multiSearchRequest) => - val jsonQueries: JSONQueries = JSONQueries( - collection.immutable - .Seq(multiSearchRequest.requests.map { searchRequest => - JSONQuery(searchRequest.query, collection.immutable.Seq(searchRequest.sources: _*)) - }: _*) - .toList - ) - multiSearchWithInnerHits[U, I](jsonQueries, innerField) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - - override def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = { - import jsonQueries._ - val request = new MultiSearchRequest() - for (query <- queries) { - request.add( - new SearchRequest(query.indices: _*) - .types(query.types: _*) - .source( - new SearchSourceBuilder( - new InputStreamStreamInput( - new ByteArrayInputStream( - query.query.getBytes() - ) - ) - ) - ) - ) - } - val responses = apply().msearch(request, RequestOptions.DEFAULT) - responses.getResponses.toList.map { response => - if (response.isFailure) { - logger.error(s"Error in multi search: ${response.getFailureMessage}") - List.empty[(U, List[I])] - } else { - Try( - new JsonParser().parse(response.getResponse.toString).getAsJsonObject ~> [U, I] innerField - ) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - } - } - } - -} - -trait RestHighLevelClientBulkApi - extends RestHighLevelClientRefreshApi - with RestHighLevelClientSettingsApi - with RestHighLevelClientIndicesApi - with BulkApi { - override type A = DocWriteRequest[_] - override type R = BulkResponse - - override def toBulkAction(bulkItem: BulkItem): A = { - import bulkItem._ - val request = action match { - case BulkAction.UPDATE => - val r = new UpdateRequest(index, if (id.isEmpty) null else id.get) - .doc(body, XContentType.JSON) - .docAsUpsert(true) -// parent.foreach(r.parent) - r - case BulkAction.DELETE => - val r = new DeleteRequest(index).id(id.getOrElse("_all")) -// parent.foreach(r.parent) - r - case _ => - val r = new IndexRequest(index).source(body, XContentType.JSON) - id.foreach(r.id) -// parent.foreach(r.parent) - r - } - request - } - - override def bulkResult: Flow[R, Set[String], NotUsed] = - Flow[BulkResponse] - .named("result") - .map(result => { - val items = result.getItems - val grouped = items.groupBy(_.getIndex) - val indices = grouped.keys.toSet - for (index <- indices) { - logger - .info(s"Bulk operation succeeded for index $index with ${grouped(index).length} items.") - } - indices - }) - - override def bulk(implicit - bulkOptions: BulkOptions, - system: ActorSystem - ): Flow[Seq[A], R, NotUsed] = { - val parallelism = Math.max(1, bulkOptions.balance) - Flow[Seq[A]] - .named("bulk") - .mapAsyncUnordered[R](parallelism) { items => - val request = new BulkRequest(bulkOptions.index) - items.foreach(request.add) - val promise: Promise[R] = Promise[R]() - apply().bulkAsync( - request, - RequestOptions.DEFAULT, - new ActionListener[BulkResponse] { - override def onResponse(response: BulkResponse): Unit = { - if (response.hasFailures) { - logger.error(s"Bulk operation failed: ${response.buildFailureMessage()}") - } else { - logger.info(s"Bulk operation succeeded with ${response.getItems.length} items.") - } - promise.success(response) - } - - override def onFailure(e: Exception): Unit = { - logger.error("Bulk operation failed", e) - promise.failure(e) - } - } - ) - promise.future - } - } - - private[this] def toBulkElasticResultItem(i: BulkItemResponse): BulkElasticResultItem = - new BulkElasticResultItem { - override def index: String = i.getIndex - } - - override implicit def toBulkElasticAction(a: DocWriteRequest[_]): BulkElasticAction = { - new BulkElasticAction { - override def index: String = a.index - } - } - - override implicit def toBulkElasticResult(r: BulkResponse): BulkElasticResult = { - new BulkElasticResult { - override def items: List[BulkElasticResultItem] = - r.getItems.toList.map(toBulkElasticResultItem) - } - } -} diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala index 2bd3d424..aa2a5b85 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala @@ -19,5 +19,5 @@ case class ElasticSearchRequest( } def query: String = - SearchBodyBuilderFn(search).string().replace("\"version\":true,", "") /*FIXME*/ + SearchBodyBuilderFn(search).string.replace("\"version\":true,", "") /*FIXME*/ } diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala index a788fcba..cb72fa86 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala @@ -1,7 +1,5 @@ package app.softnetwork.elastic.sql -//import com.sksamuel.elastic4s.requests.searches.term.{BuildableTermsQuery, TermsQuery} - import scala.util.matching.Regex /** Created by smanciot on 27/06/2018. @@ -39,8 +37,4 @@ object SQLImplicits { implicit def sqllikeToRegex(value: String): Regex = toRegex(value).r -// implicit def BuildableTermsNoOp[T]: BuildableTermsQuery[T] = new BuildableTermsQuery[T] { -// override def build(q: TermsQuery[T]): Any = null // not used by the http builders -// } - } diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala index e60408fc..752776d1 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala @@ -45,7 +45,7 @@ case class SQLQuery(query: String, score: Option[Double] = None) { } size 0 ) - }).string().replace("\"version\":true,", "") /*FIXME*/ + }).string.replace("\"version\":true,", "") /*FIXME*/ ) ) }) diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala index fa75292d..6a9902e2 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala @@ -2,7 +2,6 @@ package app.softnetwork.elastic.sql import com.sksamuel.elastic4s.ElasticApi._ import com.sksamuel.elastic4s.requests.searches.queries.Query -import SQLImplicits._ case object Where extends SQLExpr("where") with SQLRegex @@ -376,7 +375,7 @@ case class SQLIn[R, +T <: SQLValue[R]]( values: SQLValues[R, T], maybeNot: Option[Not.type] = None ) extends SQLCriteriaWithIdentifier - with ElasticFilter { + with ElasticFilter { this: SQLIn[R, T] => override def sql = s"$identifier ${maybeNot.map(_ => "not ").getOrElse("")}$operator $values" override def operator: SQLOperator = In diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala index dd0b4298..c6db8489 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala @@ -18,7 +18,7 @@ class SQLCriteriaSpec extends AnyFlatSpec with Matchers { val criteria: Option[SQLCriteria] = sql val result = SearchBodyBuilderFn( SearchRequest("*") query criteria.map(_.asQuery()).getOrElse(matchAllQuery()) - ).string() + ).string println(result) result } diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala index 09b12108..a6ec75b6 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala @@ -502,6 +502,7 @@ class SQLQuerySpec extends AnyFlatSpec with Matchers { | "match_all":{} | }, | "_source":{ + | "includes":["*"], | "excludes":["col1","col2"] | } |}""".stripMargin.replaceAll("\\s+", "") diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala index b51f24b4..ef6b9bf8 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala @@ -4,7 +4,6 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.Flow import app.softnetwork.elastic.sql.SQLQuery -import app.softnetwork.persistence.message.CountResponse import org.json4s.Formats import app.softnetwork.persistence.model.Timestamped import org.slf4j.{Logger, LoggerFactory} @@ -39,11 +38,6 @@ trait MockElasticClientApi extends ElasticClientApi { override def openIndex(index: String): Boolean = true - override def countAsync(jsonQuery: JSONQuery)(implicit - ec: ExecutionContext - ): Future[Option[Double]] = - throw new UnsupportedOperationException - override def count(jsonQuery: JSONQuery): Option[Double] = throw new UnsupportedOperationException @@ -54,21 +48,9 @@ trait MockElasticClientApi extends ElasticClientApi { )(implicit m: Manifest[U], formats: Formats): Option[U] = elasticDocuments.get(id).asInstanceOf[Option[U]] - override def getAsync[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = - Future.successful(elasticDocuments.get(id).asInstanceOf[Option[U]]) - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = elasticDocuments.getAll.toList.asInstanceOf[List[U]] - override def searchAsync[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = - Future.successful(search(sqlQuery)) - override def multiSearch[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], formats: Formats): List[List[U]] = @@ -88,23 +70,9 @@ trait MockElasticClientApi extends ElasticClientApi { true } - override def indexAsync[U <: Timestamped]( - entity: U, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - elasticDocuments.createOrUpdate(entity) - Future.successful(true) - } - override def index(index: String, id: String, source: String): Boolean = throw new UnsupportedOperationException - override def indexAsync(index: String, id: String, source: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = - throw new UnsupportedOperationException - override def update[U <: Timestamped]( entity: U, index: Option[String] = None, @@ -115,16 +83,6 @@ trait MockElasticClientApi extends ElasticClientApi { true } - override def updateAsync[U <: Timestamped]( - entity: U, - index: Option[String] = None, - maybeType: Option[String] = None, - upsert: Boolean = true - )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - elasticDocuments.createOrUpdate(entity) - Future.successful(true) - } - override def update( index: String, id: String, @@ -135,13 +93,6 @@ trait MockElasticClientApi extends ElasticClientApi { false } - override def updateAsync( - index: String, - id: String, - source: String, - upsert: Boolean - )(implicit ec: ExecutionContext): Future[Boolean] = Future.successful(false) - override def delete(uuid: String, index: String): Boolean = { if (elasticDocuments.get(uuid).isDefined) { elasticDocuments.delete(uuid) @@ -151,12 +102,6 @@ trait MockElasticClientApi extends ElasticClientApi { } } - override def deleteAsync(uuid: String, index: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - Future.successful(delete(uuid, index)) - } - override def refresh(index: String): Boolean = true override def flush(index: String, force: Boolean, wait: Boolean): Boolean = true diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala index 9e169c3d..081be14c 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala @@ -1,20 +1,35 @@ package app.softnetwork.elastic.scalatest import org.scalatest.Suite +import org.testcontainers.containers.BindMode import org.testcontainers.elasticsearch.ElasticsearchContainer import org.testcontainers.utility.DockerImageName +import java.nio.file.{Files, Path} + /** Created by smanciot on 28/06/2018. */ trait ElasticDockerTestKit extends ElasticTestKit { _: Suite => override lazy val elasticURL: String = s"http://${elasticContainer.getHttpHostAddress}" - lazy val elasticContainer = new ElasticsearchContainer( - DockerImageName - .parse(s"elasticsearch:$elasticVersion") - .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch") - ) + lazy val tmpDir: Path = Files.createTempDirectory("es-tmp") + + lazy val elasticContainer: ElasticsearchContainer = { + val container = new ElasticsearchContainer( + DockerImageName + .parse(s"docker.elastic.co/elasticsearch/elasticsearch") + .withTag(elasticVersion) + ) + container.addEnv("ES_TMPDIR", "/tmp") + container.addEnv("discovery.type", "single-node") + container.addEnv("xpack.security.enabled", "false") + container.addEnv("xpack.ml.enabled", "false") + container.addEnv("xpack.watcher.enabled", "false") + container.addEnv("xpack.graph.enabled", "false") + container.addFileSystemBind(tmpDir.toAbsolutePath.toString, "/tmp", BindMode.READ_WRITE) + container + } override def start(): Unit = elasticContainer.start() diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala index e3e67334..34a9a9c5 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala @@ -1,6 +1,7 @@ package app.softnetwork.elastic.scalatest import app.softnetwork.concurrent.scalatest.CompletionTestKit +import app.softnetwork.elastic.ElasticTestkitBuildInfo import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.requests.indexes.admin.RefreshIndexResponse import com.sksamuel.elastic4s.{ElasticClient, ElasticDsl, Indexes} @@ -13,7 +14,6 @@ import org.scalatest.{BeforeAndAfterAll, Suite} import org.scalatest.matchers.{MatchResult, Matcher} import org.slf4j.Logger -import java.util.UUID import scala.util.{Failure, Success} /** Created by smanciot on 18/05/2021. @@ -22,7 +22,7 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft def log: Logger - def elasticVersion: String = "7.17.28" + def elasticVersion: String = ElasticTestkitBuildInfo.version def elasticURL: String @@ -41,8 +41,6 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft |} |""".stripMargin - lazy val clusterName: String = s"test-${UUID.randomUUID()}" - lazy val elasticClient: ElasticClient = ElasticClient( new JavaClient( RestClient diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index b63e3494..048c46d0 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -1,8 +1,5 @@ package app.softnetwork.elastic.client -import java.io.ByteArrayInputStream -import java.util.concurrent.TimeUnit -import java.util.UUID import akka.actor.ActorSystem import app.softnetwork.elastic.sql.SQLQuery import com.fasterxml.jackson.core.JsonParseException @@ -19,7 +16,10 @@ import com.typesafe.scalalogging.StrictLogging import org.json4s.Formats import org.slf4j.{Logger, LoggerFactory} -import java.nio.file.{Files, Paths} +import _root_.java.io.ByteArrayInputStream +import _root_.java.util.concurrent.TimeUnit +import _root_.java.util.UUID +import _root_.java.nio.file.{Files, Paths} import scala.concurrent.{Await, ExecutionContextExecutor} import scala.concurrent.duration.Duration import scala.util.{Failure, Success} diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala similarity index 83% rename from testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientSpec.scala rename to testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala index 819c55be..e1f34b11 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala @@ -1,6 +1,6 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.client.RestHighLevelProviders.{ +import app.softnetwork.elastic.client.ElasticsearchProviders.{ BinaryProvider, PersonProvider, SampleProvider @@ -9,7 +9,7 @@ import app.softnetwork.elastic.model.{Binary, Sample} import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.person.model.Person -class RestHighLevelClientSpec extends ElasticClientSpec { +class ElasticsearchClientSpec extends ElasticClientSpec { lazy val pClient: ElasticProvider[Person] with ElasticClientApi = new PersonProvider( elasticConfig diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala similarity index 60% rename from testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala rename to testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala index 442985ca..7ab92464 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala @@ -1,41 +1,41 @@ package app.softnetwork.elastic.client -import app.softnetwork.elastic.client.rest.RestHighLevelClientProvider +import app.softnetwork.elastic.client.java.ElasticsearchClientProvider import app.softnetwork.elastic.model.{Binary, Sample} import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.person.model.Person +import co.elastic.clients.elasticsearch.ElasticsearchClient import com.typesafe.config.Config -import org.elasticsearch.client.RestHighLevelClient -object RestHighLevelProviders { +object ElasticsearchProviders { class PersonProvider(es: Config) - extends RestHighLevelClientProvider[Person] + extends ElasticsearchClientProvider[Person] with ManifestWrapper[Person] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es - implicit lazy val jestClient: RestHighLevelClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } class SampleProvider(es: Config) - extends RestHighLevelClientProvider[Sample] + extends ElasticsearchClientProvider[Sample] with ManifestWrapper[Sample] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es - implicit lazy val jestClient: RestHighLevelClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } class BinaryProvider(es: Config) - extends RestHighLevelClientProvider[Binary] + extends ElasticsearchClientProvider[Binary] with ManifestWrapper[Binary] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es - implicit lazy val jestClient: RestHighLevelClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } } From 12494da92899a57362bed1255d55c0fbfa070e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Fri, 18 Jul 2025 13:07:15 +0200 Subject: [PATCH 3/4] prepare PR --- java/build.sbt | 4 +- ...sticProcessorStreamWithJavaProvider.scala} | 2 +- project/Versions.scala | 3 - .../client/rest/RestHighLevelClientApi.scala | 817 ------------------ .../scalatest/ElasticDockerTestKit.scala | 4 +- .../client/ElasticsearchProviders.scala | 6 +- 6 files changed, 8 insertions(+), 828 deletions(-) rename java/src/main/scala/app/softnetwork/elastic/persistence/query/{State2ElasticProcessorStreamWithRestProvider.scala => State2ElasticProcessorStreamWithJavaProvider.scala} (87%) delete mode 100644 rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala diff --git a/java/build.sbt b/java/build.sbt index 885b083d..a4146b35 100644 --- a/java/build.sbt +++ b/java/build.sbt @@ -12,8 +12,8 @@ val jacksonExclusions = Seq( val rest = Seq( "org.elasticsearch" % "elasticsearch" % Versions.elasticSearch exclude ("org.apache.logging.log4j", "log4j-api"), - "co.elastic.clients" % "elasticsearch-java" % Versions.rest exclude ("org.elasticsearch", "elasticsearch"), - "org.elasticsearch.client" % "elasticsearch-rest-client" % Versions.rest + "co.elastic.clients" % "elasticsearch-java" % Versions.elasticSearch exclude ("org.elasticsearch", "elasticsearch"), + "org.elasticsearch.client" % "elasticsearch-rest-client" % Versions.elasticSearch ).map(_.excludeAll(jacksonExclusions: _*)) libraryDependencies ++= rest diff --git a/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala b/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala similarity index 87% rename from java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala rename to java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala index 6907b441..c9cd532c 100644 --- a/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithRestProvider.scala +++ b/java/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJavaProvider.scala @@ -5,6 +5,6 @@ import app.softnetwork.persistence.message.CrudEvent import app.softnetwork.persistence.model.Timestamped import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} -trait State2ElasticProcessorStreamWithRestProvider[T <: Timestamped, E <: CrudEvent] +trait State2ElasticProcessorStreamWithJavaProvider[T <: Timestamped, E <: CrudEvent] extends State2ElasticProcessorStream[T, E] with ElasticsearchClientProvider[T] { _: JournalProvider with OffsetProvider => } diff --git a/project/Versions.scala b/project/Versions.scala index 0619d822..440d9544 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -24,8 +24,6 @@ object Versions { val elastic4s = "8.18.2" - val jest = "6.3.1" - val log4j = "2.8.2" val testContainers = "1.18.0" @@ -34,5 +32,4 @@ object Versions { val gson = "2.8.0" - val rest = "8.18.3" // java client } diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala deleted file mode 100644 index 1eed7568..00000000 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ /dev/null @@ -1,817 +0,0 @@ -package app.softnetwork.elastic.client.rest - -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import app.softnetwork.elastic.client._ -import app.softnetwork.elastic.sql.SQLQuery -import app.softnetwork.elastic.{client, sql} -import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.serialization.serialization -import com.google.gson.JsonParser -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest -import org.elasticsearch.action.admin.indices.flush.FlushRequest -import org.elasticsearch.action.admin.indices.open.OpenIndexRequest -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest -import org.elasticsearch.action.bulk.{BulkItemResponse, BulkRequest, BulkResponse} -import org.elasticsearch.action.delete.{DeleteRequest, DeleteResponse} -import org.elasticsearch.action.get.{GetRequest, GetResponse} -import org.elasticsearch.action.index.{IndexRequest, IndexResponse} -import org.elasticsearch.action.search.{MultiSearchRequest, SearchRequest, SearchResponse} -import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse} -import org.elasticsearch.action.{ActionListener, DocWriteRequest} -import org.elasticsearch.client.RequestOptions -import org.elasticsearch.client.core.{CountRequest, CountResponse} -import org.elasticsearch.client.indices.{ - CloseIndexRequest, - CreateIndexRequest, - GetMappingsRequest, - PutMappingRequest -} -import org.elasticsearch.common.io.stream.InputStreamStreamInput -import org.elasticsearch.xcontent.{DeprecationHandler, XContentType} -import org.elasticsearch.rest.RestStatus -import org.elasticsearch.search.aggregations.bucket.filter.Filter -import org.elasticsearch.search.aggregations.bucket.nested.Nested -import org.elasticsearch.search.aggregations.metrics.{Avg, Cardinality, Max, Min, Sum, ValueCount} -import org.elasticsearch.search.builder.SearchSourceBuilder -import org.json4s.Formats - -import java.io.ByteArrayInputStream -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} - -@deprecated("Use app.softnetwork.elastic.client.java.ElasticsearchClientApi instead", "8.x") -trait RestHighLevelClientApi - extends ElasticClientApi - with RestHighLevelClientIndicesApi - with RestHighLevelClientAliasApi - with RestHighLevelClientSettingsApi - with RestHighLevelClientMappingApi - with RestHighLevelClientRefreshApi - with RestHighLevelClientFlushApi - with RestHighLevelClientCountApi - with RestHighLevelClientSingleValueAggregateApi - with RestHighLevelClientIndexApi - with RestHighLevelClientUpdateApi - with RestHighLevelClientDeleteApi - with RestHighLevelClientGetApi - with RestHighLevelClientSearchApi - with RestHighLevelClientBulkApi - -trait RestHighLevelClientIndicesApi extends IndicesApi with RestHighLevelClientCompanion { - override def createIndex(index: String, settings: String): Boolean = { - apply() - .indices() - .create( - new CreateIndexRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def deleteIndex(index: String): Boolean = { - apply().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - - override def openIndex(index: String): Boolean = { - apply().indices().open(new OpenIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - - override def closeIndex(index: String): Boolean = { - apply().indices().close(new CloseIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged - } - -} - -trait RestHighLevelClientAliasApi extends AliasApi with RestHighLevelClientCompanion { - override def addAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.ADD) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def removeAlias(index: String, alias: String): Boolean = { - apply() - .indices() - .updateAliases( - new IndicesAliasesRequest() - .addAliasAction( - new AliasActions(AliasActions.Type.REMOVE) - .index(index) - .alias(alias) - ), - RequestOptions.DEFAULT - ) - .isAcknowledged - } -} - -trait RestHighLevelClientSettingsApi extends SettingsApi with RestHighLevelClientCompanion { - _: RestHighLevelClientIndicesApi => - - override def updateSettings(index: String, settings: String): Boolean = { - apply() - .indices() - .putSettings( - new UpdateSettingsRequest(index) - .settings(settings, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def loadSettings(): String = { - apply() - .indices() - .getSettings( - new GetSettingsRequest().indices("*"), - RequestOptions.DEFAULT - ) - .toString - } -} - -trait RestHighLevelClientMappingApi extends MappingApi with RestHighLevelClientCompanion { - override def setMapping(index: String, mapping: String): Boolean = { - apply() - .indices() - .putMapping( - new PutMappingRequest(index) - .source(mapping, XContentType.JSON), - RequestOptions.DEFAULT - ) - .isAcknowledged - } - - override def getMapping(index: String): String = { - apply() - .indices() - .getMapping( - new GetMappingsRequest().indices(index), - RequestOptions.DEFAULT - ) - .toString - } -} - -trait RestHighLevelClientRefreshApi extends RefreshApi with RestHighLevelClientCompanion { - override def refresh(index: String): Boolean = { - apply() - .indices() - .refresh( - new RefreshRequest(index), - RequestOptions.DEFAULT - ) - .getStatus - .getStatus < 400 - } -} - -trait RestHighLevelClientFlushApi extends FlushApi with RestHighLevelClientCompanion { - override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = { - apply() - .indices() - .flush( - new FlushRequest(index).force(force).waitIfOngoing(wait), - RequestOptions.DEFAULT - ) - .getStatus == RestStatus.OK - } -} - -trait RestHighLevelClientCountApi extends CountApi with RestHighLevelClientCompanion { - override def countAsync( - query: client.JSONQuery - )(implicit ec: ExecutionContext): Future[Option[Double]] = { - val promise = Promise[Option[Double]]() - apply().countAsync( - new CountRequest().indices(query.indices: _*).types(query.types: _*), - RequestOptions.DEFAULT, - new ActionListener[CountResponse] { - override def onResponse(response: CountResponse): Unit = - promise.success(Option(response.getCount.toDouble)) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } - - override def count(query: client.JSONQuery): Option[Double] = { - Option( - apply() - .count( - new CountRequest().indices(query.indices: _*).types(query.types: _*), - RequestOptions.DEFAULT - ) - .getCount - .toDouble - ) - } -} - -trait RestHighLevelClientSingleValueAggregateApi - extends SingleValueAggregateApi - with RestHighLevelClientCountApi { - override def aggregate( - sqlQuery: SQLQuery - )(implicit ec: ExecutionContext): Future[Seq[SingleValueAggregateResult]] = { - val futures = for (aggregation <- sqlQuery.aggregations) yield { - val promise: Promise[SingleValueAggregateResult] = Promise() - val field = aggregation.field - val sourceField = aggregation.sourceField - val aggType = aggregation.aggType - val aggName = aggregation.aggName - val query = aggregation.query.getOrElse("") - val sources = aggregation.sources - sourceField match { - case "_id" if aggType.sql == "count" => - countAsync( - JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ) - ).onComplete { - case Success(result) => - promise.success( - SingleValueAggregateResult( - field, - aggType, - result.getOrElse(0d), - None - ) - ) - case Failure(f) => - logger.error(f.getMessage, f.fillInStackTrace()) - promise.success(SingleValueAggregateResult(field, aggType, 0d, Some(f.getMessage))) - } - promise.future - case _ => - val jsonQuery = JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ) - import jsonQuery._ - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - jsonQuery.query - ) - apply().searchAsync( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT, - new ActionListener[SearchResponse] { - override def onResponse(response: SearchResponse): Unit = { - val agg = aggName.split("\\.").last - - val itAgg = aggName.split("\\.").iterator - - var root = - if (aggregation.nested) { - response.getAggregations.get(itAgg.next()).asInstanceOf[Nested].getAggregations - } else { - response.getAggregations - } - - if (aggregation.filtered) { - root = root.get(itAgg.next()).asInstanceOf[Filter].getAggregations - } - - promise.success( - SingleValueAggregateResult( - field, - aggType, - aggType match { - case sql.Count => - if (aggregation.distinct) { - root.get(agg).asInstanceOf[Cardinality].value() - } else { - root.get(agg).asInstanceOf[ValueCount].value() - } - case sql.Sum => - root.get(agg).asInstanceOf[Sum].value() - case sql.Avg => - root.get(agg).asInstanceOf[Avg].value() - case sql.Min => - root.get(agg).asInstanceOf[Min].value() - case sql.Max => - root.get(agg).asInstanceOf[Max].value() - case _ => 0d - }, - None - ) - ) - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } - } - Future.sequence(futures) - } -} - -trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - override def index(index: String, id: String, source: String): Boolean = { - apply() - .index( - new IndexRequest(index) - .id(id) - .source(source, XContentType.JSON), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def indexAsync(index: String, id: String, source: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().indexAsync( - new IndexRequest(index) - .id(id) - .source(source, XContentType.JSON), - RequestOptions.DEFAULT, - new ActionListener[IndexResponse] { - override def onResponse(response: IndexResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - override def update( - index: String, - id: String, - source: String, - upsert: Boolean - ): Boolean = { - apply() - .update( - new UpdateRequest(index, id) - .doc(source, XContentType.JSON) - .docAsUpsert(upsert), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def updateAsync( - index: String, - id: String, - source: String, - upsert: Boolean - )(implicit ec: ExecutionContext): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().updateAsync( - new UpdateRequest(index, id) - .doc(source, XContentType.JSON) - .docAsUpsert(upsert), - RequestOptions.DEFAULT, - new ActionListener[UpdateResponse] { - override def onResponse(response: UpdateResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientCompanion { - _: RestHighLevelClientRefreshApi => - - override def delete(uuid: String, index: String): Boolean = { - apply() - .delete( - new DeleteRequest(index, uuid), - RequestOptions.DEFAULT - ) - .status() - .getStatus < 400 - } - - override def deleteAsync(uuid: String, index: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - val promise: Promise[Boolean] = Promise() - apply().deleteAsync( - new DeleteRequest(index, uuid), - RequestOptions.DEFAULT, - new ActionListener[DeleteResponse] { - override def onResponse(response: DeleteResponse): Unit = - promise.success(response.status().getStatus < 400) - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientGetApi extends GetApi with RestHighLevelClientCompanion { - def get[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], formats: Formats): Option[U] = { - Try( - apply().get( - new GetRequest( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ), - RequestOptions.DEFAULT - ) - ) match { - case Success(response) => - if (response.isExists) { - val source = response.getSourceAsString - logger.info(s"Deserializing response $source for id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") - // Deserialize the source string to the expected type - // Note: This assumes that the source is a valid JSON representation of U - // and that the serialization library is capable of handling it. - Try(serialization.read[U](source)) match { - case Success(value) => Some(value) - case Failure(f) => - logger.error( - s"Failed to deserialize response $source for id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", - f - ) - None - } - } else { - None - } - case Failure(f) => - logger.error( - s"Failed to get document with id: $id, index: ${index - .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", - f - ) - None - } - } - - override def getAsync[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = { - val promise = Promise[Option[U]]() - apply().getAsync( - new GetRequest( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ), - RequestOptions.DEFAULT, - new ActionListener[GetResponse] { - override def onResponse(response: GetResponse): Unit = { - if (response.isExists) { - promise.success(Some(serialization.read[U](response.getSourceAsString))) - } else { - promise.success(None) - } - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - promise.future - } -} - -trait RestHighLevelClientSearchApi extends SearchApi with RestHighLevelClientCompanion { - override def search[U]( - jsonQuery: JSONQuery - )(implicit m: Manifest[U], formats: Formats): List[U] = { - import jsonQuery._ - logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - query - ) - val response = apply().search( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT - ) - if (response.getHits.getTotalHits.value > 0) { - response.getHits.getHits.toList.map { hit => - logger.info(s"Deserializing hit: ${hit.getSourceAsString}") - serialization.read[U](hit.getSourceAsString) - } - } else { - List.empty[U] - } - } - - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - search[U](JSONQuery(searchRequest.query, indices)) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - - override def searchAsync[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = { - val promise = Promise[List[U]]() - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - val jsonQuery = JSONQuery(searchRequest.query, indices) - import jsonQuery._ - logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - query - ) - // Execute the search asynchronously - apply().searchAsync( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT, - new ActionListener[SearchResponse] { - override def onResponse(response: SearchResponse): Unit = { - if (response.getHits.getTotalHits.value > 0) { - promise.success(response.getHits.getHits.toList.map { hit => - serialization.read[U](hit.getSourceAsString) - }) - } else { - promise.success(List.empty[U]) - } - } - - override def onFailure(e: Exception): Unit = promise.failure(e) - } - ) - case None => - promise.failure( - new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - ) - } - promise.future - } - - override def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = { - import jsonQuery._ - // Create a parser for the query - val xContentParser = XContentType.JSON - .xContent() - .createParser( - namedXContentRegistry, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - jsonQuery.query - ) - val response = apply().search( - new SearchRequest(indices: _*) - .types(types: _*) - .source( - SearchSourceBuilder.fromXContent(xContentParser) - ), - RequestOptions.DEFAULT - ) - Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - } - - override def multiSearch[U]( - jsonQueries: JSONQueries - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - import jsonQueries._ - val request = new MultiSearchRequest() - for (query <- queries) { - request.add( - new SearchRequest(query.indices: _*) - .types(query.types: _*) - .source( - new SearchSourceBuilder( - new InputStreamStreamInput( - new ByteArrayInputStream( - query.query.getBytes() - ) - ) - ) - ) - ) - } - val responses = apply().msearch(request, RequestOptions.DEFAULT) - responses.getResponses.toList.map { response => - if (response.isFailure) { - logger.error(s"Error in multi search: ${response.getFailureMessage}") - List.empty[U] - } else { - response.getResponse.getHits.getHits.toList.map { hit => - serialization.read[U](hit.getSourceAsString) - } - } - } - } - - override def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = { - import jsonQueries._ - val request = new MultiSearchRequest() - for (query <- queries) { - request.add( - new SearchRequest(query.indices: _*) - .types(query.types: _*) - .source( - new SearchSourceBuilder( - new InputStreamStreamInput( - new ByteArrayInputStream( - query.query.getBytes() - ) - ) - ) - ) - ) - } - val responses = apply().msearch(request, RequestOptions.DEFAULT) - responses.getResponses.toList.map { response => - if (response.isFailure) { - logger.error(s"Error in multi search: ${response.getFailureMessage}") - List.empty[(U, List[I])] - } else { - Try( - new JsonParser().parse(response.getResponse.toString).getAsJsonObject ~> [U, I] innerField - ) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - } - } - } - -} - -trait RestHighLevelClientBulkApi - extends RestHighLevelClientRefreshApi - with RestHighLevelClientSettingsApi - with RestHighLevelClientIndicesApi - with BulkApi { - override type A = DocWriteRequest[_] - override type R = BulkResponse - - override def toBulkAction(bulkItem: BulkItem): A = { - import bulkItem._ - val request = action match { - case BulkAction.UPDATE => - new UpdateRequest(index, id.orNull) - .doc(body, XContentType.JSON) - .docAsUpsert(true) - case BulkAction.DELETE => - new DeleteRequest(index).id(id.getOrElse("_all")) - case _ => - new IndexRequest(index).source(body, XContentType.JSON).id(id.orNull) - } - request - } - - override def bulkResult: Flow[R, Set[String], NotUsed] = - Flow[BulkResponse] - .named("result") - .map(result => { - val items = result.getItems - val grouped = items.groupBy(_.getIndex) - val indices = grouped.keys.toSet - for (index <- indices) { - logger - .info(s"Bulk operation succeeded for index $index with ${grouped(index).length} items.") - } - indices - }) - - override def bulk(implicit - bulkOptions: BulkOptions, - system: ActorSystem - ): Flow[Seq[A], R, NotUsed] = { - val parallelism = Math.max(1, bulkOptions.balance) - Flow[Seq[A]] - .named("bulk") - .mapAsyncUnordered[R](parallelism) { items => - val request = new BulkRequest(bulkOptions.index) - items.foreach(request.add) - val promise: Promise[R] = Promise[R]() - apply().bulkAsync( - request, - RequestOptions.DEFAULT, - new ActionListener[BulkResponse] { - override def onResponse(response: BulkResponse): Unit = { - if (response.hasFailures) { - logger.error(s"Bulk operation failed: ${response.buildFailureMessage()}") - } else { - logger.info(s"Bulk operation succeeded with ${response.getItems.length} items.") - } - promise.success(response) - } - - override def onFailure(e: Exception): Unit = { - logger.error("Bulk operation failed", e) - promise.failure(e) - } - } - ) - promise.future - } - } - - private[this] def toBulkElasticResultItem(i: BulkItemResponse): BulkElasticResultItem = - new BulkElasticResultItem { - override def index: String = i.getIndex - } - - override implicit def toBulkElasticAction(a: DocWriteRequest[_]): BulkElasticAction = { - new BulkElasticAction { - override def index: String = a.index - } - } - - override implicit def toBulkElasticResult(r: BulkResponse): BulkElasticResult = { - new BulkElasticResult { - override def items: List[BulkElasticResultItem] = - r.getItems.toList.map(toBulkElasticResultItem) - } - } -} diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala index 6b2cac9b..5319e163 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala @@ -46,8 +46,8 @@ trait ElasticDockerTestKit extends ElasticTestKit { _: Suite => "/usr/share/elasticsearch/tmp", BindMode.READ_WRITE ) - container.addEnv("ES_JAVA_OPTS", "-Xms1024m -Xmx1024m") - container.setWaitStrategy(Wait.forHttp("/").forStatusCode(200)) + // container.addEnv("ES_JAVA_OPTS", "-Xms1024m -Xmx1024m") + // container.setWaitStrategy(Wait.forHttp("/").forStatusCode(200)) container.withStartupTimeout(Duration.ofMinutes(2)) } diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala index 4997a252..3dfaa33f 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala @@ -16,7 +16,7 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: RestHighLevelClient = apply() + implicit lazy val restHighLevelClient: ElasticsearchClient = apply() } class SampleProvider(es: Config) @@ -26,7 +26,7 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: RestHighLevelClient = apply() + implicit lazy val restHighLevelClient: ElasticsearchClient = apply() } class BinaryProvider(es: Config) @@ -36,6 +36,6 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: RestHighLevelClient = apply() + implicit lazy val restHighLevelClient: ElasticsearchClient = apply() } } From bb87396d0ff987dbd89c41ed25e5a3251aed0d65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Fri, 18 Jul 2025 14:25:25 +0200 Subject: [PATCH 4/4] add support for asynchronous calls (index, update, delete, get and search) --- .../client/java/ElasticsearchClientApi.scala | 188 +++++++++++++++--- .../java/ElasticsearchClientCompanion.scala | 66 ++++-- 2 files changed, 205 insertions(+), 49 deletions(-) diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala index 59425aae..31f10822 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala @@ -21,32 +21,9 @@ import co.elastic.clients.elasticsearch.core.msearch.{ MultisearchHeader, RequestItem } -import co.elastic.clients.elasticsearch.core.{ - BulkRequest, - BulkResponse, - CountRequest, - DeleteRequest, - GetRequest, - IndexRequest, - MsearchRequest, - SearchRequest, - UpdateRequest -} +import co.elastic.clients.elasticsearch.core._ import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction} -import co.elastic.clients.elasticsearch.indices.{ - CloseIndexRequest, - CreateIndexRequest, - DeleteIndexRequest, - FlushRequest, - GetIndicesSettingsRequest, - GetMappingRequest, - IndexSettings, - OpenRequest, - PutIndicesSettingsRequest, - PutMappingRequest, - RefreshRequest, - UpdateAliasesRequest -} +import co.elastic.clients.elasticsearch.indices._ import com.google.gson.JsonParser import _root_.java.io.StringReader @@ -348,6 +325,26 @@ trait ElasticsearchClientIndexApi extends IndexApi with ElasticsearchClientCompa .intValue() == 0 } + override def indexAsync(index: String, id: String, source: String)(implicit + ec: ExecutionContext + ): Future[Boolean] = { + fromCompletableFuture( + async() + .index( + new IndexRequest.Builder() + .index(index) + .id(id) + .withJson(new StringReader(source)) + .build() + ) + ).flatMap { response => + if (response.shards().failed().intValue() == 0) { + Future.successful(true) + } else { + Future.failed(new Exception(s"Failed to index document with id: $id in index: $index")) + } + } + } } trait ElasticsearchClientUpdateApi extends UpdateApi with ElasticsearchClientCompanion { @@ -373,6 +370,28 @@ trait ElasticsearchClientUpdateApi extends UpdateApi with ElasticsearchClientCom .intValue() == 0 } + override def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit + ec: ExecutionContext + ): Future[Boolean] = { + fromCompletableFuture( + async() + .update( + new UpdateRequest.Builder[JMap[String, Object], JMap[String, Object]]() + .index(index) + .id(id) + .doc(mapper.readValue(source, classOf[JMap[String, Object]])) + .docAsUpsert(upsert) + .build(), + classOf[JMap[String, Object]] + ) + ).flatMap { response => + if (response.shards().failed().intValue() == 0) { + Future.successful(true) + } else { + Future.failed(new Exception(s"Failed to update document with id: $id in index: $index")) + } + } + } } trait ElasticsearchClientDeleteApi extends DeleteApi with ElasticsearchClientCompanion { @@ -388,6 +407,23 @@ trait ElasticsearchClientDeleteApi extends DeleteApi with ElasticsearchClientCom .intValue() == 0 } + override def deleteAsync(uuid: String, index: String)(implicit + ec: ExecutionContext + ): Future[Boolean] = { + fromCompletableFuture( + async() + .delete( + new DeleteRequest.Builder().index(index).id(uuid).build() + ) + ).flatMap { response => + if (response.shards().failed().intValue() == 0) { + Future.successful(true) + } else { + Future.failed(new Exception(s"Failed to delete document with id: $uuid in index: $index")) + } + } + } + } trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion { @@ -443,6 +479,50 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion } } + override def getAsync[U <: Timestamped]( + id: String, + index: Option[String] = None, + maybeType: Option[String] = None + )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = { + fromCompletableFuture( + async() + .get( + new GetRequest.Builder() + .index( + index.getOrElse( + maybeType.getOrElse( + m.runtimeClass.getSimpleName.toLowerCase + ) + ) + ) + .id(id) + .build(), + classOf[JMap[String, Object]] + ) + ).flatMap { + case response if response.found() => + val source = mapper.writeValueAsString(response.source()) + logger.info(s"Deserializing response $source for id: $id, index: ${index + .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") + // Deserialize the source string to the expected type + // Note: This assumes that the source is a valid JSON representation of U + // and that the serialization library is capable of handling it. + Try(serialization.read[U](source)) match { + case Success(value) => Future.successful(Some(value)) + case Failure(f) => + logger.error( + s"Failed to deserialize response $source for id: $id, index: ${index + .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}", + f + ) + Future.successful(None) + } + case _ => Future.successful(None) + } + Future { + this.get[U](id, index, maybeType) + } + } } trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCompanion { @@ -488,6 +568,50 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom } } + override def searchAsync[U]( + sqlQuery: SQLQuery + )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = { + sqlQuery.search match { + case Some(searchRequest) => + val indices = collection.immutable.Seq(searchRequest.sources: _*) + fromCompletableFuture( + async() + .search( + new SearchRequest.Builder() + .index(indices.asJava) + .withJson(new StringReader(searchRequest.query)) + .build(), + classOf[JMap[String, Object]] + ) + ).flatMap { + case response if response.hits().total().value() > 0 => + Future.successful( + response + .hits() + .hits() + .asScala + .map { hit => + val source = mapper.writeValueAsString(hit.source()) + logger.info(s"Deserializing hit: $source") + serialization.read[U](source) + } + .toList + ) + case _ => + logger.warn( + s"No hits found for query: ${sqlQuery.query} on indices: ${indices.mkString(", ")}" + ) + Future.successful(List.empty[U]) + } + case None => + Future.failed( + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + ) + } + } + override def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit m1: Manifest[U], m2: Manifest[I], @@ -675,11 +799,15 @@ trait ElasticsearchClientBulkApi override implicit def toBulkElasticAction(a: BulkOperation): BulkElasticAction = new BulkElasticAction { - override def index: String = - if (a.isIndex) a.index().index() - else if (a.isDelete) a.delete().index() - else if (a.isUpdate) a.update().index() - else throw new IllegalArgumentException("Unsupported bulk action type") + override def index: String = { + a match { + case op if op.isIndex => op.index().index() + case op if op.isDelete => op.delete().index() + case op if op.isUpdate => op.update().index() + case _ => + throw new IllegalArgumentException(s"Unsupported bulk operation type: ${a.getClass}") + } + } } override implicit def toBulkElasticResult(r: BulkResponse): BulkElasticResult = { diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala index 4ae8a5f8..44603144 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala @@ -1,7 +1,7 @@ package app.softnetwork.elastic.client.java import app.softnetwork.elastic.client.ElasticConfig -import co.elastic.clients.elasticsearch.ElasticsearchClient +import co.elastic.clients.elasticsearch.{ElasticsearchAsyncClient, ElasticsearchClient} import co.elastic.clients.json.jackson.JacksonJsonpMapper import co.elastic.clients.transport.rest_client.RestClientTransport import com.fasterxml.jackson.databind.ObjectMapper @@ -12,39 +12,67 @@ import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.elasticsearch.client.{RestClient, RestClientBuilder} +import java.util.concurrent.CompletableFuture +import scala.concurrent.{Future, Promise} + trait ElasticsearchClientCompanion extends StrictLogging { def elasticConfig: ElasticConfig private var client: Option[ElasticsearchClient] = None + private var asyncClient: Option[ElasticsearchAsyncClient] = None + lazy val mapper = new ObjectMapper() + def transport: RestClientTransport = { + val credentialsProvider = new BasicCredentialsProvider() + if (elasticConfig.credentials.username.nonEmpty) { + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + elasticConfig.credentials.username, + elasticConfig.credentials.password + ) + ) + } + val restClientBuilder: RestClientBuilder = RestClient + .builder( + HttpHost.create(elasticConfig.credentials.url) + ) + .setHttpClientConfigCallback((httpAsyncClientBuilder: HttpAsyncClientBuilder) => + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + ) + new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper()) + } + def apply(): ElasticsearchClient = { client match { case Some(c) => c case _ => - val credentialsProvider = new BasicCredentialsProvider() - if (elasticConfig.credentials.username.nonEmpty) { - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials( - elasticConfig.credentials.username, - elasticConfig.credentials.password - ) - ) - } - val restClientBuilder: RestClientBuilder = RestClient - .builder( - HttpHost.create(elasticConfig.credentials.url) - ) - .setHttpClientConfigCallback((httpAsyncClientBuilder: HttpAsyncClientBuilder) => - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) - ) - val transport = new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper()) val c = new ElasticsearchClient(transport) client = Some(c) c } } + + def async(): ElasticsearchAsyncClient = { + asyncClient match { + case Some(c) => c + case _ => + val c = new ElasticsearchAsyncClient(transport) + asyncClient = Some(c) + c + } + } + + def fromCompletableFuture[T](cf: CompletableFuture[T]): Future[T] = { + val promise = Promise[T]() + cf.whenComplete { (result: T, err: Throwable) => + if (err != null) promise.failure(err) + else promise.success(result) + } + promise.future + } + }