Skip to content
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ![SoftClient4ES Logo](https://raw.githubusercontent.com/SOFTNETWORK-APP/SoftClient4ES/main/logo.png)


![Build Status](https://github.com/SOFTNETWORK-APP/SoftClient4ES/workflows/Build/badge.svg)
[![codecov](https://codecov.io/gh/SOFTNETWORK-APP/SoftClient4ES/graph/badge.svg?token=XYCWBGVHAC)](https://codecov.io/gh/SOFTNETWORK-APP/SoftClient4ES)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/1c13d6eb7d6c4a1495cd47e457c132dc)](https://app.codacy.com/gh/SOFTNETWORK-APP/elastic/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
[![License](https://img.shields.io/github/license/SOFTNETWORK-APP/elastic)](https://github.com/SOFTNETWORK-APP/elastic/blob/main/LICENSE)

**SoftClient4ES** is a modular and version-resilient interface built on top of Elasticsearch clients, providing a unified and stable API that simplifies migration across Elasticsearch versions, accelerates development, and offers advanced features for search, indexing, and data manipulation.

## Key Features

**Unified Elasticsearch API**
This project provides a trait-based interface (`ElasticClientApi`) that aggregates the core functionalities of Elasticsearch: indexing, searching, updating, deleting, mapping, aliases, refreshing, and more. This design abstracts the underlying client implementation and ensures compatibility across different Elasticsearch versions.

- `JestClientApi`: For Elasticsearch 6 using the open-source [Jest client](https://github.com/searchbox-io/Jest).
- `RestHighLevelClientApi`: For Elasticsearch 6 and 7 using the official high-level REST client.
- `ElasticsearchClientApi`: For Elasticsearch 8 and 9 using the official Java client.

By relying on these concrete implementations, developers can switch between versions with minimal changes to their business logic.

**SQL to Elasticsearch Query Translation**
Elastic Client includes a parser capable of translating SQL `SELECT` queries into Elasticsearch queries. The parser produces an intermediate representation, which is then converted into [Elastic4s](https://github.com/sksamuel/elastic4s) DSL queries and ultimately into native Elasticsearch queries. This allows data engineers and analysts to express queries in familiar SQL syntax.

**Dynamic Mapping Migration**
Elastic Client provides tools to analyze and compare existing mappings with new ones. If differences are detected, it can automatically perform safe migrations. This includes creating temporary indices, reindexing, and renaming — all while preserving data integrity. This eliminates the need for manual mapping migrations and reduces downtime.

**High-Performance Bulk API with Akka Streams**
Bulk operations leverage the power of Akka Streams to efficiently process and index large volumes of data. This stream-based approach improves performance, resilience, and backpressure handling, especially for real-time or high-throughput indexing scenarios.

**Akka Persistence Integration**
The project offers seamless integration with Akka Persistence. This enables Elasticsearch indices to be updated reactively based on persistent events, offering a robust pattern for event-sourced systems.

## Roadmap

Future enhancements include expanding the SQL parser to support additional operations such as `INSERT`, `UPDATE`, and `DELETE`. The long-term vision is to deliver a fully functional, open-source **JDBC connector for Elasticsearch**, empowering users to interact with their data using standard SQL tooling.

## License

This project is open source and licensed under the Apache License 2.0.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.2.1"
ThisBuild / version := "0.3.0"

ThisBuild / scalaVersion := scala213

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
case other => fail(other.toString)
}

pClient.search[Person]("select * from person_mapping where match(name, 'gum')") match {
pClient.search[Person]("select * from person_mapping where match (name) against ('gum')") match {
case r if r.size == 1 =>
r.map(_.uuid) should contain only "A16"
case other => fail(other.toString)
}

pClient.search[Person](
"select * from person_mapping where uuid <> 'A16' and match(name, 'gum')"
"select * from person_mapping where uuid <> 'A16' and match (name) against ('gum')"
) match {
case r if r.isEmpty =>
case other => fail(other.toString)
Expand Down Expand Up @@ -239,7 +239,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M

"person_migration" should haveCount(3)

pClient.search[Person]("select * from person_migration where match(name, 'gum')") match {
pClient.search[Person]("select * from person_migration where match (name) against ('gum')") match {
case r if r.isEmpty =>
case other => fail(other.toString)
}
Expand Down Expand Up @@ -288,7 +288,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M
pClient.shouldUpdateMapping("person_migration", newMapping) shouldBe true
pClient.updateMapping("person_migration", newMapping) shouldBe true

pClient.search[Person]("select * from person_migration where match(name, 'gum')") match {
pClient.search[Person]("select * from person_migration where match (name) against ('gum')") match {
case r if r.size == 1 =>
r.map(_.uuid) should contain only "A16"
case other => fail(other.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@ import app.softnetwork.elastic.sql.{
ElasticBoolQuery,
Max,
Min,
SQLAggregate,
SQLBucket,
SQLCriteria,
SQLField,
Sum
}
import com.sksamuel.elastic4s.ElasticApi.{
avgAgg,
cardinalityAgg,
filterAgg,
matchAllQuery,
maxAgg,
minAgg,
nestedAggregation,
sumAgg,
termsAgg,
valueCountAgg
}
import com.sksamuel.elastic4s.searches.aggs.Aggregation
import com.sksamuel.elastic4s.searches.aggs.{
Aggregation,
FilterAggregation,
NestedAggregation,
TermsAggregation
}

import scala.language.implicitConversions

Expand All @@ -32,89 +39,141 @@ case class ElasticAggregation(
sources: Seq[String] = Seq.empty,
query: Option[String] = None,
distinct: Boolean = false,
nested: Boolean = false,
filtered: Boolean = false,
nestedAgg: Option[NestedAggregation] = None,
filteredAgg: Option[FilterAggregation] = None,
aggType: AggregateFunction,
agg: Aggregation
)
) {
val nested: Boolean = nestedAgg.nonEmpty
val filtered: Boolean = filteredAgg.nonEmpty
}

object ElasticAggregation {
def apply(sqlAgg: SQLAggregate): ElasticAggregation = {
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
import sqlAgg._
val sourceField = identifier.columnName
val sourceField = identifier.name

val field = alias match {
val field = fieldAlias match {
case Some(alias) => alias.alias
case _ => sourceField
}

val distinct = identifier.distinct.isDefined
val distinct = identifier.distinct

val agg =
if (distinct)
s"${function}_distinct_${sourceField.replace(".", "_")}"
val aggType = aggregateFunction.getOrElse(
throw new IllegalArgumentException("Aggregation function is required")
)

val aggName = {
if (fieldAlias.isDefined)
field
else if (distinct)
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
else
s"${function}_${sourceField.replace(".", "_")}"
s"${aggType}_${sourceField.replace(".", "_")}"
}

var aggPath = Seq[String]()

val _agg =
function match {
aggType match {
case Count =>
if (distinct)
cardinalityAgg(agg, sourceField)
cardinalityAgg(aggName, sourceField)
else {
valueCountAgg(agg, sourceField)
valueCountAgg(aggName, sourceField)
}
case Min => minAgg(agg, sourceField)
case Max => maxAgg(agg, sourceField)
case Avg => avgAgg(agg, sourceField)
case Sum => sumAgg(agg, sourceField)
case Min => minAgg(aggName, sourceField)
case Max => maxAgg(aggName, sourceField)
case Avg => avgAgg(aggName, sourceField)
case Sum => sumAgg(aggName, sourceField)
}

def _filtered: Aggregation = filter match {
case Some(f) =>
val boolQuery = Option(ElasticBoolQuery(group = true))
val filteredAgg = s"filtered_agg"
aggPath ++= Seq(filteredAgg)
filterAgg(
filteredAgg,
f.criteria
.map(
_.asFilter(boolQuery)
val filteredAggName = "filtered_agg"

val filteredAgg: Option[FilterAggregation] =
filter match {
case Some(f) =>
val boolQuery = Option(ElasticBoolQuery(group = true))
Some(
filterAgg(
filteredAggName,
f.asFilter(boolQuery)
.query(Set(identifier.innerHitsName).flatten, boolQuery)
)
.getOrElse(matchAllQuery())
) subaggs {
aggPath ++= Seq(agg)
_agg
}
case _ =>
aggPath ++= Seq(agg)
_agg
}
)
case _ =>
None
}

def filtered(): Unit =
filteredAgg match {
case Some(_) =>
aggPath ++= Seq(filteredAggName)
aggPath ++= Seq(aggName)
case _ =>
aggPath ++= Seq(aggName)
}

val aggregation =
val nestedAgg =
if (identifier.nested) {
val path = sourceField.split("\\.").head
val nestedAgg = s"nested_$agg"
val nestedAgg = s"nested_${identifier.nestedType.getOrElse(aggName)}"
aggPath ++= Seq(nestedAgg)
nestedAggregation(nestedAgg, path) subaggs {
_filtered
}
filtered()
Some(nestedAggregation(nestedAgg, path))
} else {
_filtered
filtered()
None
}

ElasticAggregation(
aggPath.mkString("."),
field,
sourceField,
distinct = distinct,
nested = identifier.nested,
filtered = filter.nonEmpty,
aggType = function,
agg = aggregation
nestedAgg = nestedAgg,
filteredAgg = filteredAgg,
aggType = aggType,
agg = _agg
)
}

/*
def apply(
buckets: Seq[SQLBucket],
aggregations: Seq[Aggregation],
current: Option[TermsAggregation]
): Option[TermsAggregation] = {
buckets match {
case Nil =>
current.map(_.copy(subaggs = aggregations))
case bucket +: tail =>
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
current match {
case Some(a) =>
apply(tail, aggregations, Some(agg)) match {
case Some(subAgg) =>
Some(a.copy(subaggs = a.subaggs :+ subAgg))
case _ => Some(a)
}
case None =>
apply(tail, aggregations, Some(agg))
}
}
}
*/

def buildBuckets(
buckets: Seq[SQLBucket],
aggregations: Seq[Aggregation]
): Option[TermsAggregation] = {
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
current match {
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
case None => Some(agg.copy(subaggs = aggregations))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package app.softnetwork.elastic.sql.bridge

import app.softnetwork.elastic.sql.{SQLCriteria, SQLExcept, SQLField}
import app.softnetwork.elastic.sql.{SQLBucket, SQLCriteria, SQLExcept, SQLField}
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn

Expand All @@ -11,6 +11,7 @@ case class ElasticSearchRequest(
criteria: Option[SQLCriteria],
limit: Option[Int],
search: SearchRequest,
buckets: Seq[SQLBucket] = Seq.empty,
aggregations: Seq[ElasticAggregation] = Seq.empty
) {
def minScore(score: Option[Double]): ElasticSearchRequest = {
Expand Down
Loading