Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml
files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml
flags: unittests
fail_ci_if_error: false
verbose: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,17 @@ trait SearchApi {

def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U]

def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U]
def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = {
sqlQuery.search match {
case Some(searchRequest) =>
val indices = collection.immutable.Seq(searchRequest.sources: _*)
search[U](JSONQuery(searchRequest.query, indices))(m, formats)
case None =>
throw new IllegalArgumentException(
s"SQL query ${sqlQuery.query} does not contain a valid search request"
)
}
}

def searchAsync[U](
sqlQuery: SQLQuery
Expand All @@ -569,7 +579,7 @@ trait SearchApi {
case Some(searchRequest) =>
val indices = collection.immutable.Seq(searchRequest.sources: _*)
val jsonQuery = JSONQuery(searchRequest.query, indices)
searchWithInnerHits(jsonQuery, innerField)
searchWithInnerHits(jsonQuery, innerField)(m1, m2, formats)
case None =>
throw new IllegalArgumentException(
s"SQL query ${sqlQuery.query} does not contain a valid search request"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import co.elastic.clients.elasticsearch.core._
import co.elastic.clients.elasticsearch.core.search.SearchRequestBody
import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction}
import co.elastic.clients.elasticsearch.indices._
import com.google.gson.JsonParser
import com.google.gson.{Gson, JsonParser}

import _root_.java.io.StringReader
import _root_.java.util.{Map => JMap}
Expand Down Expand Up @@ -129,8 +129,7 @@ trait ElasticsearchClientSettingsApi extends SettingsApi with ElasticsearchClien
.getSettings(
new GetIndicesSettingsRequest.Builder().index("*").build()
)
.toString
settings.substring(settings.indexOf(':') + 1).trim
extractSource(settings).getOrElse("")
}
}

Expand All @@ -150,8 +149,7 @@ trait ElasticsearchClientMappingApi extends MappingApi with ElasticsearchClientC
.getMapping(
new GetMappingRequest.Builder().index(index).build()
)
.toString
mapping.substring(mapping.indexOf(':') + 1).trim
extractSource(mapping).getOrElse("")
}
}

Expand Down Expand Up @@ -477,7 +475,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion
case Success(response) =>
if (response.found()) {
val source = mapper.writeValueAsString(response.source())
logger.info(s"Deserializing response $source for id: $id, index: ${index
logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index
.getOrElse("default")}, type: ${maybeType.getOrElse("_all")}")
// Deserialize the source string to the expected type
// Note: This assumes that the source is a valid JSON representation of U
Expand Down Expand Up @@ -528,7 +526,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion
).flatMap {
case response if response.found() =>
val source = mapper.writeValueAsString(response.source())
logger.info(s"Deserializing response $source for id: $id, index: ${index
logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index
.getOrElse("default")}, type: ${maybeType.getOrElse("_all")}")
// Deserialize the source string to the expected type
// Note: This assumes that the source is a valid JSON representation of U
Expand Down Expand Up @@ -571,29 +569,22 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
.hits()
.hits()
.asScala
.map { hit =>
.flatMap { hit =>
val source = mapper.writeValueAsString(hit.source())
logger.info(s"Deserializing hit: $source")
serialization.read[U](source)
logger.whenDebugEnabled(s"Deserializing hit: $source")
Try(serialization.read[U](source)).toOption.orElse {
logger.error(
s"Failed to deserialize hit: $source"
)
None
}
}
.toList
} else {
List.empty[U]
}
}

override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = {
sqlQuery.search match {
case Some(searchRequest) =>
val indices = collection.immutable.Seq(searchRequest.sources: _*)
search[U](JSONQuery(searchRequest.query, indices))
case None =>
throw new IllegalArgumentException(
s"SQL query ${sqlQuery.query} does not contain a valid search request"
)
}
}

override def searchAsync[U](
sqlQuery: SQLQuery
)(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = {
Expand All @@ -618,7 +609,7 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
.asScala
.map { hit =>
val source = mapper.writeValueAsString(hit.source())
logger.info(s"Deserializing hit: $source")
logger.whenDebugEnabled(s"Deserializing hit: $source")
serialization.read[U](source)
}
.toList
Expand All @@ -644,20 +635,70 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
formats: Formats
): List[(U, List[I])] = {
import jsonQuery._
val response = apply().search(
new SearchRequest.Builder()
.index(indices.asJava)
.withJson(
new StringReader(query)
)
.build(),
classOf[JMap[String, Object]]
)
Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match {
case Success(s) => s
case Failure(f) =>
logger.error(f.getMessage, f)
List.empty
logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}")
val response = apply()
.search(
new SearchRequest.Builder()
.index(indices.asJava)
.withJson(
new StringReader(query)
)
.build(),
classOf[JMap[String, Object]]
)
val results = response
.hits()
.hits()
.asScala
.toList
if (results.nonEmpty) {
results.flatMap { hit =>
val hitSource = hit.source()
Option(hitSource)
.map(mapper.writeValueAsString)
.flatMap { source =>
logger.whenDebugEnabled(s"Deserializing hit: $source")
Try(serialization.read[U](source)) match {
case Success(mainObject) =>
Some(mainObject)
case Failure(f) =>
logger.error(
s"Failed to deserialize hit: $source for query: $query on indices: ${indices.mkString(", ")}",
f
)
None
}
}
.map { mainObject =>
val innerHits = hit
.innerHits()
.asScala
.get(innerField)
.map(_.hits().hits().asScala.toList)
.getOrElse(Nil)
val innerObjects = innerHits.flatMap { innerHit =>
extractSource(innerHit) match {
case Some(innerSource) =>
logger.whenDebugEnabled(s"Processing inner hit: $innerSource")
val json = new JsonParser().parse(innerSource).getAsJsonObject
val gson = new Gson()
Try(serialization.read[I](gson.toJson(json.get("_source")))) match {
case Success(innerObject) => Some(innerObject)
case Failure(f) =>
logger.error(s"Failed to deserialize inner hit: $innerSource", f)
None
}
case None =>
logger.warn("Could not extract inner hit source from string representation")
None
}
}
(mainObject, innerObjects)
}
}
} else {
logger.warn(s"No hits found for query: $query on indices: ${indices.mkString(", ")}")
List.empty[(U, List[I])]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import co.elastic.clients.elasticsearch.{ElasticsearchAsyncClient, Elasticsearch
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import co.elastic.clients.transport.rest_client.RestClientTransport
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.ClassTagExtensions
import com.typesafe.scalalogging.StrictLogging
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
Expand All @@ -23,7 +24,7 @@ trait ElasticsearchClientCompanion extends StrictLogging {

private var asyncClient: Option[ElasticsearchAsyncClient] = None

lazy val mapper = new ObjectMapper()
lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions

def transport: RestClientTransport = {
val credentialsProvider = new BasicCredentialsProvider()
Expand Down Expand Up @@ -75,4 +76,11 @@ trait ElasticsearchClientCompanion extends StrictLogging {
promise.future
}

protected def extractSource(value: AnyRef): Option[String] = {
val s = value.toString
val idx = s.indexOf(':')
if (idx >= 0 && idx + 1 < s.length) Some(s.substring(idx + 1).trim)
else None
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package app.softnetwork.elastic.scalatest

import org.scalatest.Suite
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.wait.strategy.Wait
//import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.elasticsearch.ElasticsearchContainer
import org.testcontainers.utility.DockerImageName

Expand Down
Loading