From e3e8ed11f575791ca3d15a5c291e82a62cde8d24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Wed, 23 Jul 2025 09:14:15 +0200 Subject: [PATCH] fix inner hits serach, update specifications --- .github/workflows/release.yml | 2 +- .../elastic/client/ElasticClientApi.scala | 14 +- .../client/java/ElasticsearchClientApi.scala | 115 +++++++++++------ .../java/ElasticsearchClientCompanion.scala | 10 +- .../scalatest/ElasticDockerTestKit.scala | 2 +- .../elastic/client/ElasticClientSpec.scala | 120 +++++++++++++++++- .../client/ElasticsearchClientSpec.scala | 7 +- .../client/ElasticsearchProviders.scala | 18 ++- .../softnetwork/elastic/model/Parent.scala | 47 +++++++ 9 files changed, 280 insertions(+), 55 deletions(-) create mode 100644 testkit/src/test/scala/app/softnetwork/elastic/model/Parent.scala diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 18250730..0d7b3099 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -48,7 +48,7 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: - files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml + files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml flags: unittests fail_ci_if_error: false verbose: true diff --git a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index 78c16d0c..8e5ed58d 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -552,7 +552,17 @@ trait SearchApi { def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U] - def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] + def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { + sqlQuery.search match { + case Some(searchRequest) => + val indices = collection.immutable.Seq(searchRequest.sources: _*) + search[U](JSONQuery(searchRequest.query, indices))(m, formats) + case None => + throw new IllegalArgumentException( + s"SQL query ${sqlQuery.query} does not contain a valid search request" + ) + } + } def searchAsync[U]( sqlQuery: SQLQuery @@ -569,7 +579,7 @@ trait SearchApi { case Some(searchRequest) => val indices = collection.immutable.Seq(searchRequest.sources: _*) val jsonQuery = JSONQuery(searchRequest.query, indices) - searchWithInnerHits(jsonQuery, innerField) + searchWithInnerHits(jsonQuery, innerField)(m1, m2, formats) case None => throw new IllegalArgumentException( s"SQL query ${sqlQuery.query} does not contain a valid search request" diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala index acb5ead9..3a3785b6 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala @@ -21,7 +21,7 @@ import co.elastic.clients.elasticsearch.core._ import co.elastic.clients.elasticsearch.core.search.SearchRequestBody import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction} import co.elastic.clients.elasticsearch.indices._ -import com.google.gson.JsonParser +import com.google.gson.{Gson, JsonParser} import _root_.java.io.StringReader import _root_.java.util.{Map => JMap} @@ -129,8 +129,7 @@ trait ElasticsearchClientSettingsApi extends SettingsApi with ElasticsearchClien .getSettings( new GetIndicesSettingsRequest.Builder().index("*").build() ) - .toString - settings.substring(settings.indexOf(':') + 1).trim + extractSource(settings).getOrElse("") } } @@ -150,8 +149,7 @@ trait ElasticsearchClientMappingApi extends MappingApi with ElasticsearchClientC .getMapping( new GetMappingRequest.Builder().index(index).build() ) - .toString - mapping.substring(mapping.indexOf(':') + 1).trim + extractSource(mapping).getOrElse("") } } @@ -477,7 +475,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion case Success(response) => if (response.found()) { val source = mapper.writeValueAsString(response.source()) - logger.info(s"Deserializing response $source for id: $id, index: ${index + logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") // Deserialize the source string to the expected type // Note: This assumes that the source is a valid JSON representation of U @@ -528,7 +526,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion ).flatMap { case response if response.found() => val source = mapper.writeValueAsString(response.source()) - logger.info(s"Deserializing response $source for id: $id, index: ${index + logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index .getOrElse("default")}, type: ${maybeType.getOrElse("_all")}") // Deserialize the source string to the expected type // Note: This assumes that the source is a valid JSON representation of U @@ -571,10 +569,15 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom .hits() .hits() .asScala - .map { hit => + .flatMap { hit => val source = mapper.writeValueAsString(hit.source()) - logger.info(s"Deserializing hit: $source") - serialization.read[U](source) + logger.whenDebugEnabled(s"Deserializing hit: $source") + Try(serialization.read[U](source)).toOption.orElse { + logger.error( + s"Failed to deserialize hit: $source" + ) + None + } } .toList } else { @@ -582,18 +585,6 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom } } - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { - sqlQuery.search match { - case Some(searchRequest) => - val indices = collection.immutable.Seq(searchRequest.sources: _*) - search[U](JSONQuery(searchRequest.query, indices)) - case None => - throw new IllegalArgumentException( - s"SQL query ${sqlQuery.query} does not contain a valid search request" - ) - } - } - override def searchAsync[U]( sqlQuery: SQLQuery )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = { @@ -618,7 +609,7 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom .asScala .map { hit => val source = mapper.writeValueAsString(hit.source()) - logger.info(s"Deserializing hit: $source") + logger.whenDebugEnabled(s"Deserializing hit: $source") serialization.read[U](source) } .toList @@ -644,20 +635,70 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom formats: Formats ): List[(U, List[I])] = { import jsonQuery._ - val response = apply().search( - new SearchRequest.Builder() - .index(indices.asJava) - .withJson( - new StringReader(query) - ) - .build(), - classOf[JMap[String, Object]] - ) - Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty + logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}") + val response = apply() + .search( + new SearchRequest.Builder() + .index(indices.asJava) + .withJson( + new StringReader(query) + ) + .build(), + classOf[JMap[String, Object]] + ) + val results = response + .hits() + .hits() + .asScala + .toList + if (results.nonEmpty) { + results.flatMap { hit => + val hitSource = hit.source() + Option(hitSource) + .map(mapper.writeValueAsString) + .flatMap { source => + logger.whenDebugEnabled(s"Deserializing hit: $source") + Try(serialization.read[U](source)) match { + case Success(mainObject) => + Some(mainObject) + case Failure(f) => + logger.error( + s"Failed to deserialize hit: $source for query: $query on indices: ${indices.mkString(", ")}", + f + ) + None + } + } + .map { mainObject => + val innerHits = hit + .innerHits() + .asScala + .get(innerField) + .map(_.hits().hits().asScala.toList) + .getOrElse(Nil) + val innerObjects = innerHits.flatMap { innerHit => + extractSource(innerHit) match { + case Some(innerSource) => + logger.whenDebugEnabled(s"Processing inner hit: $innerSource") + val json = new JsonParser().parse(innerSource).getAsJsonObject + val gson = new Gson() + Try(serialization.read[I](gson.toJson(json.get("_source")))) match { + case Success(innerObject) => Some(innerObject) + case Failure(f) => + logger.error(s"Failed to deserialize inner hit: $innerSource", f) + None + } + case None => + logger.warn("Could not extract inner hit source from string representation") + None + } + } + (mainObject, innerObjects) + } + } + } else { + logger.warn(s"No hits found for query: $query on indices: ${indices.mkString(", ")}") + List.empty[(U, List[I])] } } diff --git a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala index 44603144..080bb3d0 100644 --- a/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala +++ b/java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala @@ -5,6 +5,7 @@ import co.elastic.clients.elasticsearch.{ElasticsearchAsyncClient, Elasticsearch import co.elastic.clients.json.jackson.JacksonJsonpMapper import co.elastic.clients.transport.rest_client.RestClientTransport import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.ClassTagExtensions import com.typesafe.scalalogging.StrictLogging import org.apache.http.HttpHost import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} @@ -23,7 +24,7 @@ trait ElasticsearchClientCompanion extends StrictLogging { private var asyncClient: Option[ElasticsearchAsyncClient] = None - lazy val mapper = new ObjectMapper() + lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions def transport: RestClientTransport = { val credentialsProvider = new BasicCredentialsProvider() @@ -75,4 +76,11 @@ trait ElasticsearchClientCompanion extends StrictLogging { promise.future } + protected def extractSource(value: AnyRef): Option[String] = { + val s = value.toString + val idx = s.indexOf(':') + if (idx >= 0 && idx + 1 < s.length) Some(s.substring(idx + 1).trim) + else None + } + } diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala index 5319e163..b2e88a55 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala @@ -2,7 +2,7 @@ package app.softnetwork.elastic.scalatest import org.scalatest.Suite import org.testcontainers.containers.BindMode -import org.testcontainers.containers.wait.strategy.Wait +//import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.elasticsearch.ElasticsearchContainer import org.testcontainers.utility.DockerImageName diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index 5235f9e3..9b359dfb 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -19,6 +19,7 @@ import org.slf4j.{Logger, LoggerFactory} import _root_.java.io.ByteArrayInputStream import _root_.java.nio.file.{Files, Paths} +import _root_.java.time.format.DateTimeFormatter import _root_.java.util.concurrent.TimeUnit import _root_.java.util.UUID import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} @@ -46,6 +47,7 @@ trait ElasticClientSpec def pClient: ElasticProvider[Person] with ElasticClientApi def sClient: ElasticProvider[Sample] with ElasticClientApi def bClient: ElasticProvider[Binary] with ElasticClientApi + def parentClient: ElasticProvider[Parent] with ElasticClientApi import scala.language.implicitConversions @@ -146,7 +148,7 @@ trait ElasticClientSpec ) private val personsWithUpsert = - persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09"}], "childrenCount": 2 } """ + persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ val children: List[String] = List( """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"} """, @@ -603,7 +605,7 @@ trait ElasticClientSpec // test distinct count aggregation pClient .aggregate( - SQLQuery("select count(distinct p.uuid) as c from person10 p") + "select count(distinct p.uuid) as c from person10 p" ) .complete() match { case Success(s) => s.headOption.flatMap(_.asDoubleOption).getOrElse(0d) should ===(3d) @@ -611,27 +613,43 @@ trait ElasticClientSpec } // test count aggregation - pClient.aggregate(SQLQuery("select count(p.uuid) as c from person10 p")).complete() match { + pClient + .aggregate( + "select count(p.uuid) as c from person10 p" + ) + .complete() match { case Success(s) => s.headOption.flatMap(_.asDoubleOption).getOrElse(0d) should ===(3d) case Failure(f) => fail(f.getMessage) } // test max aggregation on date field - pClient.aggregate(SQLQuery("select max(p.birthDate) as c from person10 p")).complete() match { + pClient + .aggregate( + "select max(p.birthDate) as c from person10 p" + ) + .complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1969-05-09T00:00:00.000Z") case Failure(f) => fail(f.getMessage) } // test min aggregation on date field - pClient.aggregate(SQLQuery("select min(p.birthDate) as c from person10 p")).complete() match { + pClient + .aggregate( + "select min(p.birthDate) as c from person10 p" + ) + .complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1967-11-21T00:00:00.000Z") case Failure(f) => fail(f.getMessage) } // test avg aggregation on date field - pClient.aggregate(SQLQuery("select avg(p.birthDate) as c from person10 p")).complete() match { + pClient + .aggregate( + "select avg(p.birthDate) as c from person10 p" + ) + .complete() match { case Success(s) => s.headOption.flatMap(_.asStringOption).getOrElse("") should ===("1968-05-17T08:00:00.000Z") case Failure(f) => fail(f.getMessage) @@ -640,7 +658,7 @@ trait ElasticClientSpec // test sum aggregation on integer field pClient .aggregate( - SQLQuery("select sum(p.childrenCount) as c from person10 p") + "select sum(p.childrenCount) as c from person10 p" ) .complete() match { case Success(s) => @@ -649,4 +667,92 @@ trait ElasticClientSpec } } + + "Nested queries" should "work" in { + parentClient.createIndex("parent") shouldBe true + val mapping = + """{ + | "properties": { + | "birthDate": { + | "type": "date" + | }, + | "uuid": { + | "type": "keyword" + | }, + | "name": { + | "type": "keyword" + | }, + | "createdDate": { + | "type": "date", + | "null_value": "1970-01-01" + | }, + | "lastUpdated": { + | "type": "date", + | "null_value": "1970-01-01" + | }, + | "children": { + | "type": "nested", + | "include_in_parent": true, + | "properties": { + | "name": { + | "type": "keyword" + | }, + | "birthDate": { + | "type": "date" + | } + | } + | }, + | "childrenCount": { + | "type": "integer" + | } + | } + |} + """.stripMargin.replaceAll("\n", "").replaceAll("\\s+", "") + logger.info(s"mapping: $mapping") + parentClient.setMapping("parent", mapping) shouldBe true + + implicit val bulkOptions: BulkOptions = BulkOptions("parent", "parent", 1000) + val indices = + parentClient + .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) + refresh(indices) + parentClient.flush("parent") + parentClient.refresh("parent") + + indices should contain only "parent" + + blockUntilCount(3, "parent") + + "parent" should haveCount(3) + + val parents = parentClient.search[Parent]("select * from parent") + assert(parents.size == 3) + + val results = parentClient.searchWithInnerHits[Parent, Child]( + """SELECT + | p.uuid, + | p.name, + | p.birthDate, + | p.children, + | inner_children.name, + | inner_children.birthDate + |FROM + | parent as p, + | UNNEST(p.children) as inner_children + |WHERE + | inner_children.name is not null AND p.uuid = 'A16' + |""".stripMargin, + "inner_children" + ) + results.size shouldBe 1 + val result = results.head + result._1.uuid shouldBe "A16" + result._1.children.size shouldBe 2 + result._2.size shouldBe 2 + result._2.map(_.name) should contain allOf ("Steve Gumble", "Josh Gumble") + result._2.map( + _.birthDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + ) should contain allOf ("1999-05-09", "2002-05-09") + result._2.map(_.parentId) should contain only "A16" + } } diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala index e1f34b11..e0165b77 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchClientSpec.scala @@ -2,10 +2,11 @@ package app.softnetwork.elastic.client import app.softnetwork.elastic.client.ElasticsearchProviders.{ BinaryProvider, + ParentProvider, PersonProvider, SampleProvider } -import app.softnetwork.elastic.model.{Binary, Sample} +import app.softnetwork.elastic.model.{Binary, Parent, Sample} import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.person.model.Person @@ -20,5 +21,7 @@ class ElasticsearchClientSpec extends ElasticClientSpec { lazy val bClient: ElasticProvider[Binary] with ElasticClientApi = new BinaryProvider( elasticConfig ) - + lazy val parentClient: ElasticProvider[Parent] with ElasticClientApi = new ParentProvider( + elasticConfig + ) } diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala index 3dfaa33f..261f286b 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticsearchProviders.scala @@ -1,7 +1,7 @@ package app.softnetwork.elastic.client import app.softnetwork.elastic.client.java.ElasticsearchClientProvider -import app.softnetwork.elastic.model.{Binary, Sample} +import app.softnetwork.elastic.model.{Binary, Parent, Sample} import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.person.model.Person import co.elastic.clients.elasticsearch.ElasticsearchClient @@ -16,7 +16,7 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: ElasticsearchClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } class SampleProvider(es: Config) @@ -26,7 +26,7 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: ElasticsearchClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } class BinaryProvider(es: Config) @@ -36,6 +36,16 @@ object ElasticsearchProviders { override lazy val config: Config = es - implicit lazy val restHighLevelClient: ElasticsearchClient = apply() + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() + } + + class ParentProvider(es: Config) + extends ElasticsearchClientProvider[Parent] + with ManifestWrapper[Parent] { + override protected val manifestWrapper: ManifestW = ManifestW() + + override lazy val config: Config = es + + implicit lazy val elasticsearchClient: ElasticsearchClient = apply() } } diff --git a/testkit/src/test/scala/app/softnetwork/elastic/model/Parent.scala b/testkit/src/test/scala/app/softnetwork/elastic/model/Parent.scala new file mode 100644 index 00000000..ace5d9b8 --- /dev/null +++ b/testkit/src/test/scala/app/softnetwork/elastic/model/Parent.scala @@ -0,0 +1,47 @@ +package app.softnetwork.elastic.model + +import app.softnetwork.persistence.{generateUUID, now} +import app.softnetwork.persistence.model.Timestamped +import app.softnetwork.time._ + +import java.time.{Instant, LocalDate} + +case class Parent( + uuid: String, + name: String, + birthDate: LocalDate, + children: Seq[Child] = Seq.empty[Child] +) extends Timestamped { + def addChild(child: Child): Parent = copy(children = children :+ child) + lazy val createdDate: Instant = Instant.now() + lazy val lastUpdated: Instant = Instant.now() +} + +case class Child(name: String, birthDate: LocalDate, parentId: String) + +object Parent { + def apply(name: String, birthDate: LocalDate): Parent = + apply( + generateUUID(), + name, + birthDate + ) + + def apply(uuid: String, name: String, birthDate: LocalDate): Parent = + apply( + uuid, + name, + birthDate, + Seq.empty[Child] + ) + + def apply(uuid: String, name: String, birthDate: LocalDate, children: Seq[Child]): Parent = { + Parent( + uuid = uuid, + name = name, + birthDate = birthDate, + children = children + ) + } + +}