diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 06f62a60..7ad5ffa7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -23,8 +23,8 @@ permissions: jobs: test: - runs-on: self-hosted -# runs-on: ubuntu-latest +# runs-on: self-hosted + runs-on: ubuntu-latest env: # define Java options for both official sbt and sbt-extras JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 @@ -38,6 +38,8 @@ jobs: java-version: '8' distribution: 'temurin' # cache: 'sbt' + - name: Setup sbt launcher + uses: sbt/setup-sbt@v1 - name: Run tests run: sbt test @@ -52,5 +54,7 @@ jobs: java-version: '8' distribution: 'temurin' # cache: 'sbt' + - name: Setup sbt launcher + uses: sbt/setup-sbt@v1 - name: Formatting run: sbt scalafmtSbtCheck scalafmtCheck test:scalafmtCheck \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1c8d5e5a..cda10035 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -20,8 +20,8 @@ permissions: jobs: release: - runs-on: self-hosted -# runs-on: ubuntu-latest +# runs-on: self-hosted + runs-on: ubuntu-latest env: # define Java options for both official sbt and sbt-extras JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 @@ -35,6 +35,8 @@ jobs: java-version: '8' distribution: 'temurin' # cache: 'sbt' + - name: Setup sbt launcher + uses: sbt/setup-sbt@v1 - name: Formatting run: sbt scalafmtSbtCheck scalafmtCheck test:scalafmtCheck - name: Run tests & Coverage Report diff --git a/build.sbt b/build.sbt index 1477ba8e..16f5f1fa 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ ThisBuild / organization := "app.softnetwork" name := "elastic" -ThisBuild / version := "6.7.2.3" +ThisBuild / version := "7.17.29" ThisBuild / scalaVersion := "2.12.18" @@ -73,24 +73,14 @@ lazy val rest = project.in(file("rest")) persistence % "compile->compile;test->test;it->it" ) -lazy val jest = project.in(file("jest")) - .configs(IntegrationTest) - .settings(Defaults.itSettings) - .dependsOn( - persistence % "compile->compile;test->test;it->it" - ) - lazy val testKit = project.in(file("testkit")) .configs(IntegrationTest) .settings(Defaults.itSettings) .dependsOn( rest % "compile->compile;test->test;it->it" ) - .dependsOn( - jest % "compile->compile;test->test;it->it" - ) lazy val root = project.in(file(".")) .configs(IntegrationTest) .settings(Defaults.itSettings, Publish.noPublishSettings) - .aggregate(sql, client, persistence, rest, jest, testKit) + .aggregate(sql, client, persistence, rest, testKit) diff --git a/client/build.sbt b/client/build.sbt index 9f7ec0f2..476485db 100644 --- a/client/build.sbt +++ b/client/build.sbt @@ -7,22 +7,18 @@ val configDependencies = Seq( "com.github.kxbmap" %% "configs" % Versions.kxbmap ) -val jackson = Seq( - "com.fasterxml.jackson.core" % "jackson-databind" % Versions.jackson, - "com.fasterxml.jackson.core" % "jackson-core" % Versions.jackson, - "com.fasterxml.jackson.core" % "jackson-annotations" % Versions.jackson, - "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % Versions.jackson -) - val jacksonExclusions = Seq( ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), ExclusionRule(organization = "org.codehaus.jackson") ) val json4s = Seq( "org.json4s" %% "json4s-jackson" % Versions.json4s, "org.json4s" %% "json4s-ext" % Versions.json4s -).map(_.excludeAll(jacksonExclusions: _*)) ++ jackson +).map(_.excludeAll(jacksonExclusions: _*)) libraryDependencies ++= configDependencies ++ json4s :+ "app.softnetwork.persistence" %% "persistence-core" % Versions.genericPersistence :+ "com.google.code.gson" % "gson" % Versions.gson diff --git a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala index 0617e2a9..ce5e7672 100644 --- a/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala +++ b/client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala @@ -119,8 +119,16 @@ trait SettingsApi { _: IndicesApi => } trait MappingApi { - def setMapping(index: String, _type: String, mapping: String): Boolean - def getMapping(index: String, _type: String): String + @deprecated("Use setMapping(index: String, mapping: String) instead", "0.7.29") + def setMapping(index: String, indexType: String, mapping: String): Boolean = { + this.setMapping(index, mapping) + } + def setMapping(index: String, mapping: String): Boolean + @deprecated("Use getMapping(index: String) instead", "0.7.29") + def getMapping(index: String, indexType: String): String = { + this.getMapping(index) + } + def getMapping(index: String): String } trait RefreshApi { @@ -137,27 +145,38 @@ trait IndexApi { _: RefreshApi => index: Option[String] = None, maybeType: Option[String] = None )(implicit u: ClassTag[U], formats: Formats): Boolean = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) this.index( - index.getOrElse(_type), - _type, + index.getOrElse(indexType), entity.uuid, serialization.write[U](entity) ) } - def index(index: String, _type: String, id: String, source: String): Boolean + @deprecated("Use index(index: String, id: String, source: String) instead", "0.7.29") + def index(index: String, indexType: String, id: String, source: String): Boolean = { + this.index(index, id, source) + } + + def index(index: String, id: String, source: String): Boolean def indexAsync[U <: Timestamped]( entity: U, index: Option[String] = None, maybeType: Option[String] = None )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) - indexAsync(index.getOrElse(_type), _type, entity.uuid, serialization.write[U](entity)) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + indexAsync(index.getOrElse(indexType), entity.uuid, serialization.write[U](entity)) } - def indexAsync(index: String, _type: String, id: String, source: String)(implicit + @deprecated("Use indexAsync(index: String, id: String, source: String) instead", "0.7.29") + def indexAsync(index: String, indexType: String, id: String, source: String)(implicit + ec: ExecutionContext + ): Future[Boolean] = { + this.indexAsync(index, id, source) + } + + def indexAsync(index: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] } @@ -169,17 +188,30 @@ trait UpdateApi { _: RefreshApi => maybeType: Option[String] = None, upsert: Boolean = true )(implicit u: ClassTag[U], formats: Formats): Boolean = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) this.update( - index.getOrElse(_type), - _type, + index.getOrElse(indexType), entity.uuid, serialization.write[U](entity), upsert ) } - def update(index: String, _type: String, id: String, source: String, upsert: Boolean): Boolean + @deprecated( + "Use update(index: String, id: String, source: String, upsert: Boolean) instead", + "0.7.29" + ) + def update( + index: String, + indexType: String, + id: String, + source: String, + upsert: Boolean + ): Boolean = { + this.update(index, id, source, upsert) + } + + def update(index: String, id: String, source: String, upsert: Boolean): Boolean def updateAsync[U <: Timestamped]( entity: U, @@ -187,19 +219,28 @@ trait UpdateApi { _: RefreshApi => maybeType: Option[String] = None, upsert: Boolean = true )(implicit u: ClassTag[U], ec: ExecutionContext, formats: Formats): Future[Boolean] = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) this .updateAsync( - index.getOrElse(_type), - _type, + index.getOrElse(indexType), entity.uuid, serialization.write[U](entity), upsert ) } - def updateAsync(index: String, _type: String, id: String, source: String, upsert: Boolean)( + @deprecated( + "Use updateAsync(index: String, id: String, source: String, upsert: Boolean) instead", + "0.7.29" + ) + def updateAsync(index: String, indexType: String, id: String, source: String, upsert: Boolean)( implicit ec: ExecutionContext + ): Future[Boolean] = { + this.updateAsync(index, id, source, upsert) + } + + def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit + ec: ExecutionContext ): Future[Boolean] } @@ -209,22 +250,34 @@ trait DeleteApi { _: RefreshApi => index: Option[String] = None, maybeType: Option[String] = None )(implicit u: ClassTag[U]): Boolean = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) - delete(entity.uuid, index.getOrElse(_type), _type) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + delete(entity.uuid, index.getOrElse(indexType)) } - def delete(uuid: String, index: String, _type: String): Boolean + @deprecated("Use delete(uuid: String, index: String) instead", "0.7.29") + def delete(uuid: String, index: String, indexType: String): Boolean = { + this.delete(uuid, index) + } + + def delete(uuid: String, index: String): Boolean def deleteAsync[U <: Timestamped]( entity: U, index: Option[String] = None, maybeType: Option[String] = None )(implicit u: ClassTag[U], ec: ExecutionContext): Future[Boolean] = { - val _type = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) - deleteAsync(entity.uuid, index.getOrElse(_type), _type) + val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase) + deleteAsync(entity.uuid, index.getOrElse(indexType)) + } + + @deprecated("Use deleteAsync(uuid: String, index: String) instead", "0.7.29") + def deleteAsync(uuid: String, index: String, indexType: String)(implicit + ec: ExecutionContext + ): Future[Boolean] = { + this.deleteAsync(uuid, index) } - def deleteAsync(uuid: String, index: String, _type: String)(implicit + def deleteAsync(uuid: String, index: String)(implicit ec: ExecutionContext ): Future[Boolean] diff --git a/jest/build.sbt b/jest/build.sbt deleted file mode 100644 index 30a8cb46..00000000 --- a/jest/build.sbt +++ /dev/null @@ -1,21 +0,0 @@ -organization := "app.softnetwork.elastic" - -name := "elastic-jest-client" - -val guavaExclusion = ExclusionRule(organization = "com.google.guava", name = "guava") - -val httpComponentsExclusions = Seq( - ExclusionRule( - organization = "org.apache.httpcomponents", - name = "httpclient", - artifact = "*", - configurations = Vector(ConfigRef("test")), - crossVersion = CrossVersion.disabled - ) -) - -val jest = Seq( - "io.searchbox" % "jest" % Versions.jest -).map(_.excludeAll(httpComponentsExclusions ++ Seq(guavaExclusion): _*)) - -libraryDependencies ++= jest diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala deleted file mode 100644 index bc4cd21a..00000000 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientApi.scala +++ /dev/null @@ -1,751 +0,0 @@ -package app.softnetwork.elastic.client.jest - -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import app.softnetwork.elastic.client._ -import app.softnetwork.elastic.sql -import app.softnetwork.elastic.sql.{ElasticSearchRequest, SQLQuery} -import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.serialization._ -import io.searchbox.action.BulkableAction -import io.searchbox.core._ -import io.searchbox.core.search.aggregation.RootAggregation -import io.searchbox.indices._ -import io.searchbox.indices.aliases.{AddAliasMapping, ModifyAliases, RemoveAliasMapping} -import io.searchbox.indices.mapping.{GetMapping, PutMapping} -import io.searchbox.indices.settings.{GetSettings, UpdateSettings} -import io.searchbox.params.Parameters -import org.json4s.Formats - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} - -/** Created by smanciot on 20/05/2021. - */ -trait JestClientApi - extends ElasticClientApi - with JestIndicesApi - with JestAliasApi - with JestSettingsApi - with JestMappingApi - with JestRefreshApi - with JestFlushApi - with JestCountApi - with JestSingleValueAggregateApi - with JestIndexApi - with JestUpdateApi - with JestDeleteApi - with JestGetApi - with JestSearchApi - with JestBulkApi - -trait JestIndicesApi extends IndicesApi with JestClientCompanion { - override def createIndex(index: String, settings: String = defaultSettings): Boolean = - apply().execute(new CreateIndex.Builder(index).settings(settings).build()).isSucceeded - - override def deleteIndex(index: String): Boolean = - apply().execute(new DeleteIndex.Builder(index).build()).isSucceeded - - override def closeIndex(index: String): Boolean = - apply().execute(new CloseIndex.Builder(index).build()).isSucceeded - - override def openIndex(index: String): Boolean = - apply().execute(new OpenIndex.Builder(index).build()).isSucceeded -} - -trait JestAliasApi extends AliasApi with JestClientCompanion { - override def addAlias(index: String, alias: String): Boolean = { - apply() - .execute( - new ModifyAliases.Builder( - new AddAliasMapping.Builder(index, alias).build() - ).build() - ) - .isSucceeded - } - - override def removeAlias(index: String, alias: String): Boolean = { - apply() - .execute( - new ModifyAliases.Builder( - new RemoveAliasMapping.Builder(index, alias).build() - ).build() - ) - .isSucceeded - } -} - -trait JestSettingsApi extends SettingsApi with JestClientCompanion { - _: IndicesApi => - override def updateSettings(index: String, settings: String = defaultSettings): Boolean = - closeIndex(index) && - apply().execute(new UpdateSettings.Builder(settings).addIndex(index).build()).isSucceeded && - openIndex(index) - - override def loadSettings(): String = - apply().execute(new GetSettings.Builder().build()).getJsonString -} - -trait JestMappingApi extends MappingApi with JestClientCompanion { - _: IndicesApi => - override def setMapping(index: String, _type: String, mapping: String): Boolean = - apply().execute(new PutMapping.Builder(index, _type, mapping).build()).isSucceeded - - override def getMapping(index: String, _type: String): String = - apply().execute(new GetMapping.Builder().addIndex(index).addType(_type).build()).getJsonString -} - -trait JestRefreshApi extends RefreshApi with JestClientCompanion { - override def refresh(index: String): Boolean = - apply().execute(new Refresh.Builder().addIndex(index).build()).isSucceeded -} - -trait JestFlushApi extends FlushApi with JestClientCompanion { - override def flush(index: String, force: Boolean = true, wait: Boolean = true): Boolean = apply() - .execute( - new Flush.Builder().addIndex(index).force(force).waitIfOngoing(wait).build() - ) - .isSucceeded -} - -trait JestCountApi extends CountApi with JestClientCompanion { - override def countAsync( - jsonQuery: JSONQuery - )(implicit ec: ExecutionContext): Future[Option[Double]] = { - import JestClientResultHandler._ - import jsonQuery._ - val count = new Count.Builder().query(query) - for (indice <- indices) count.addIndex(indice) - for (t <- types) count.addType(t) - val promise = Promise[Option[Double]]() - apply().executeAsyncPromise(count.build()) onComplete { - case Success(result) => - if (!result.isSucceeded) - logger.error(result.getErrorMessage) - promise.success(Option(result.getCount)) - case Failure(f) => - logger.error(f.getMessage, f) - promise.failure(f) - } - promise.future - } - - override def count(jsonQuery: JSONQuery): Option[Double] = { - import jsonQuery._ - val count = new Count.Builder().query(query) - for (indice <- indices) count.addIndex(indice) - for (t <- types) count.addType(t) - val result = apply().execute(count.build()) - if (!result.isSucceeded) - logger.error(result.getErrorMessage) - Option(result.getCount) - } -} - -trait JestSingleValueAggregateApi extends SingleValueAggregateApi with JestCountApi { - override def aggregate( - sqlQuery: SQLQuery - )(implicit ec: ExecutionContext): Future[Seq[SingleValueAggregateResult]] = { - val futures = for (aggregation <- sqlQuery.aggregations) yield { - val promise: Promise[SingleValueAggregateResult] = Promise() - val field = aggregation.field - val sourceField = aggregation.sourceField - val aggType = aggregation.aggType - val aggName = aggregation.aggName - val query = aggregation.query - val sources = aggregation.sources - sourceField match { - case "_id" if aggType.sql == "count" => - countAsync( - JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ) - ).onComplete { - case Success(result) => - promise.success( - SingleValueAggregateResult( - field, - aggType, - result.getOrElse(0d), - None - ) - ) - case Failure(f) => - logger.error(f.getMessage, f.fillInStackTrace()) - promise.success(SingleValueAggregateResult(field, aggType, 0d, Some(f.getMessage))) - } - promise.future - case _ => - import JestClientApi._ - import JestClientResultHandler._ - apply() - .executeAsyncPromise( - JSONQuery( - query, - collection.immutable.Seq(sources: _*), - collection.immutable.Seq.empty[String] - ).search - ) - .onComplete { - case Success(result) => - val agg = aggName.split("\\.").last - - val itAgg = aggName.split("\\.").iterator - - var root = - if (aggregation.nested) - result.getAggregations.getAggregation(itAgg.next(), classOf[RootAggregation]) - else - result.getAggregations - - if (aggregation.filtered) { - root = root.getAggregation(itAgg.next(), classOf[RootAggregation]) - } - - promise.success( - SingleValueAggregateResult( - field, - aggType, - aggType match { - case sql.Count => - if (aggregation.distinct) - root.getCardinalityAggregation(agg).getCardinality.doubleValue() - else { - root.getValueCountAggregation(agg).getValueCount.doubleValue() - } - case sql.Sum => - root.getSumAggregation(agg).getSum - case sql.Avg => - root.getAvgAggregation(agg).getAvg - case sql.Min => - root.getMinAggregation(agg).getMin - case sql.Max => - root.getMaxAggregation(agg).getMax - case _ => 0d - }, - None - ) - ) - - case Failure(f) => - logger.error(f.getMessage, f.fillInStackTrace()) - promise.success(SingleValueAggregateResult(field, aggType, 0d, Some(f.getMessage))) - } - - promise.future - } - } - Future.sequence(futures) - } -} - -trait JestIndexApi extends IndexApi with JestClientCompanion { - _: RefreshApi => - override def index(index: String, _type: String, id: String, source: String): Boolean = { - Try( - apply().execute( - new Index.Builder(source).index(index).`type`(_type).id(id).build() - ) - ) match { - case Success(s) => - if (!s.isSucceeded) - logger.error(s.getErrorMessage) - s.isSucceeded && this.refresh(index) - case Failure(f) => - logger.error(f.getMessage, f) - false - } - } - - override def indexAsync(index: String, _type: String, id: String, source: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - import JestClientResultHandler._ - val promise: Promise[Boolean] = Promise() - apply().executeAsyncPromise( - new Index.Builder(source).index(index).`type`(_type).id(id).build() - ) onComplete { - case Success(s) => promise.success(s.isSucceeded && this.refresh(index)) - case Failure(f) => - logger.error(f.getMessage, f) - promise.failure(f) - } - promise.future - } - -} - -trait JestUpdateApi extends UpdateApi with JestClientCompanion { - _: RefreshApi => - override def update( - index: String, - _type: String, - id: String, - source: String, - upsert: Boolean - ): Boolean = { - Try( - apply().execute( - new Update.Builder( - if (upsert) - docAsUpsert(source) - else - source - ).index(index).`type`(_type).id(id).build() - ) - ) match { - case Success(s) => - if (!s.isSucceeded) - logger.error(s.getErrorMessage) - s.isSucceeded && this.refresh(index) - case Failure(f) => - logger.error(f.getMessage, f) - false - } - } - - override def updateAsync( - index: String, - _type: String, - id: String, - source: String, - upsert: Boolean - )(implicit ec: ExecutionContext): Future[Boolean] = { - import JestClientResultHandler._ - val promise: Promise[Boolean] = Promise() - apply().executeAsyncPromise( - new Update.Builder( - if (upsert) - docAsUpsert(source) - else - source - ).index(index).`type`(_type).id(id).build() - ) onComplete { - case Success(s) => - if (!s.isSucceeded) - logger.error(s.getErrorMessage) - promise.success(s.isSucceeded && this.refresh(index)) - case Failure(f) => - logger.error(f.getMessage, f) - promise.failure(f) - } - promise.future - } - -} - -trait JestDeleteApi extends DeleteApi with JestClientCompanion { - _: RefreshApi => - override def delete(uuid: String, index: String, _type: String): Boolean = { - val result = apply().execute( - new Delete.Builder(uuid).index(index).`type`(_type).build() - ) - if (!result.isSucceeded) { - logger.error(result.getErrorMessage) - } - result.isSucceeded && this.refresh(index) - } - - override def deleteAsync(uuid: String, index: String, _type: String)(implicit - ec: ExecutionContext - ): Future[Boolean] = { - import JestClientResultHandler._ - val promise: Promise[Boolean] = Promise() - apply().executeAsyncPromise( - new Delete.Builder(uuid).index(index).`type`(_type).build() - ) onComplete { - case Success(s) => - if (!s.isSucceeded) - logger.error(s.getErrorMessage) - promise.success(s.isSucceeded && this.refresh(index)) - case Failure(f) => - logger.error(f.getMessage, f) - promise.failure(f) - } - promise.future - } - -} - -trait JestGetApi extends GetApi with JestClientCompanion { - - // GetApi - override def get[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], formats: Formats): Option[U] = { - val result = apply().execute( - new Get.Builder( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ).build() - ) - if (result.isSucceeded) { - Some(serialization.read[U](result.getSourceAsString)) - } else { - logger.error(result.getErrorMessage) - None - } - } - - override def getAsync[U <: Timestamped]( - id: String, - index: Option[String] = None, - maybeType: Option[String] = None - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[Option[U]] = { - import JestClientResultHandler._ - val promise: Promise[Option[U]] = Promise() - apply().executeAsyncPromise( - new Get.Builder( - index.getOrElse( - maybeType.getOrElse( - m.runtimeClass.getSimpleName.toLowerCase - ) - ), - id - ).build() - ) onComplete { - case Success(result) => - if (result.isSucceeded) - promise.success(Some(serialization.read[U](result.getSourceAsString))) - else { - logger.error(result.getErrorMessage) - promise.success(None) - } - case Failure(f) => - logger.error(f.getMessage, f) - promise.failure(f) - } - promise.future - } - -} - -trait JestSearchApi extends SearchApi with JestClientCompanion { - - import JestClientApi._ - - override def search[U]( - jsonQuery: JSONQuery - )(implicit m: Manifest[U], formats: Formats): List[U] = { - import jsonQuery._ - val search = new Search.Builder(query) - for (indice <- indices) search.addIndex(indice) - for (t <- types) search.addType(t) - Try( - apply() - .execute(search.build()) - .getSourceAsStringList - .asScala - .map(source => serialization.read[U](source)) - .toList - ) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - } - - override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = { - val search: Option[Search] = sqlQuery.jestSearch - (search match { - case Some(s) => - val result = apply().execute(s) - if (result.isSucceeded) { - Some(result) - } else { - logger.error(result.getErrorMessage) - None - } - case _ => None - }) match { - case Some(searchResult) => - Try( - searchResult.getSourceAsStringList.asScala - .map(source => serialization.read[U](source)) - .toList - ) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - case _ => List.empty - } - } - - override def searchAsync[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = { - val promise = Promise[List[U]]() - val search: Option[Search] = sqlQuery.jestSearch - search match { - case Some(s) => - import JestClientResultHandler._ - apply().executeAsyncPromise(s) onComplete { - case Success(searchResult) => - promise.success( - searchResult.getSourceAsStringList.asScala - .map(source => serialization.read[U](source)) - .toList - ) - case Failure(f) => - promise.failure(f) - } - case _ => promise.success(List.empty) - } - promise.future - } - - override def searchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = { - val search: Option[Search] = sqlQuery.jestSearch - (search match { - case Some(s) => - val result = apply().execute(s) - if (result.isSucceeded) { - Some(result) - } else { - logger.error(result.getErrorMessage) - None - } - case _ => None - }) match { - case Some(searchResult) => - Try(searchResult.getJsonObject ~> [U, I] innerField) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - case _ => List.empty - } - } - - override def searchWithInnerHits[U, I](jsonQuery: JSONQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[(U, List[I])] = { - val result = apply().execute(jsonQuery.search) - (if (result.isSucceeded) { - Some(result) - } else { - logger.error(result.getErrorMessage) - None - }) match { - case Some(searchResult) => - Try(searchResult.getJsonObject ~> [U, I] innerField) match { - case Success(s) => s - case Failure(f) => - logger.error(f.getMessage, f) - List.empty - } - case _ => List.empty - } - } - - override def multiSearch[U]( - sqlQuery: SQLQuery - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - sqlQuery.jestMultiSearch.map( - apply().execute(_) - ) match { - case Some(multiSearchResult) => - multiSearchResult.getResponses.asScala - .map(searchResponse => - searchResponse.searchResult.getSourceAsStringList.asScala - .map(source => serialization.read[U](source)) - .toList - ) - .toList - case _ => List.empty - } - } - - override def multiSearch[U]( - jsonQueries: JSONQueries - )(implicit m: Manifest[U], formats: Formats): List[List[U]] = { - val searches: List[Search] = jsonQueries.queries.map(_.search) - val multiSearchResult = apply().execute(new MultiSearch.Builder(searches.asJava).build()) - multiSearchResult.getResponses.asScala - .map(searchResponse => - searchResponse.searchResult.getSourceAsStringList.asScala - .map(source => serialization.read[U](source)) - .toList - ) - .toList - } - - override def multiSearchWithInnerHits[U, I](sqlQuery: SQLQuery, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = { - sqlQuery.jestMultiSearch - .map(nativeMultiSearchWithInnerHits(_, innerField)) - .getOrElse(List.empty) - } - - override def multiSearchWithInnerHits[U, I](jsonQueries: JSONQueries, innerField: String)(implicit - m1: Manifest[U], - m2: Manifest[I], - formats: Formats - ): List[List[(U, List[I])]] = { - nativeMultiSearchWithInnerHits( - new MultiSearch.Builder(jsonQueries.queries.map(_.search).asJava).build(), - innerField - ) - } - - private[this] def nativeMultiSearchWithInnerHits[U, I]( - multiSearch: MultiSearch, - innerField: String - )(implicit m1: Manifest[U], m2: Manifest[I], formats: Formats): List[List[(U, List[I])]] = { - val multiSearchResult = apply().execute(multiSearch) - if (multiSearchResult.isSucceeded) { - multiSearchResult.getResponses.asScala - .map(searchResponse => searchResponse.searchResult.getJsonObject ~> [U, I] innerField) - .toList - } else { - logger.error(multiSearchResult.getErrorMessage) - List.empty - } - } - -} - -trait JestBulkApi - extends JestRefreshApi - with JestSettingsApi - with JestIndicesApi - with BulkApi - with JestClientCompanion { - override type A = BulkableAction[DocumentResult] - override type R = BulkResult - - override implicit def toBulkElasticAction(a: A): BulkElasticAction = - new BulkElasticAction { - override def index: String = a.getIndex - } - - private[this] def toBulkElasticResultItem(i: BulkResult#BulkResultItem): BulkElasticResultItem = - new BulkElasticResultItem { - override def index: String = i.index - } - - override implicit def toBulkElasticResult(r: R): BulkElasticResult = - new BulkElasticResult { - override def items: List[BulkElasticResultItem] = - r.getItems.asScala.toList.map(toBulkElasticResultItem) - } - - override def bulk(implicit - bulkOptions: BulkOptions, - system: ActorSystem - ): Flow[Seq[A], R, NotUsed] = { - import JestClientResultHandler._ - val parallelism = Math.max(1, bulkOptions.balance) - - Flow[Seq[BulkableAction[DocumentResult]]] - .named("bulk") - .mapAsyncUnordered[BulkResult](parallelism)(items => { - logger.info(s"Starting to write batch of ${items.size}...") - val init = - new Bulk.Builder().defaultIndex(bulkOptions.index).defaultType(bulkOptions.documentType) - val bulkQuery = items.foldLeft(init) { (current, query) => - current.addAction(query) - } - apply().executeAsyncPromise(bulkQuery.build()) - }) - } - - override def bulkResult: Flow[R, Set[String], NotUsed] = - Flow[BulkResult] - .named("result") - .map(result => { - val items = result.getItems - val indices = items.asScala.map(_.index).toSet - logger.info(s"Finished to write batch of ${items.size} within ${indices.mkString(",")}.") - indices - }) - - override def toBulkAction(bulkItem: BulkItem): A = { - val builder = bulkItem.action match { - case BulkAction.DELETE => new Delete.Builder(bulkItem.body) - case BulkAction.UPDATE => new Update.Builder(docAsUpsert(bulkItem.body)) - case _ => new Index.Builder(bulkItem.body) - } - bulkItem.id.foreach(builder.id) - builder.index(bulkItem.index) - bulkItem.parent.foreach(s => builder.setParameter(Parameters.PARENT, s)) - builder.build() - } - -} - -object JestClientApi { - - implicit def requestToSearch(elasticSelect: ElasticSearchRequest): Search = { - import elasticSelect._ - Console.println(query) - val search = new Search.Builder(query) - for (source <- sources) search.addIndex(source) - search.build() - } - - implicit class SearchSQLQuery(sqlQuery: SQLQuery) { - def jestSearch: Option[Search] = { - sqlQuery.search match { - case Some(value) => Some(value) - case None => None - } - } - } - - implicit class MultiSearchSQLQuery(sqlQuery: SQLQuery) { - def jestMultiSearch: Option[MultiSearch] = { - sqlQuery.multiSearch.map(m => { - import m._ - - import scala.collection.JavaConverters._ - Console.println(query) - new MultiSearch.Builder(m.requests.map(requestToSearch).asJava).build() - }) - } - } - - implicit class SearchJSONQuery(jsonQuery: JSONQuery) { - def search: Search = { - import jsonQuery._ - val _search = new Search.Builder(query) - for (indice <- indices) _search.addIndex(indice) - for (t <- types) _search.addType(t) - _search.build() - } - } - - implicit class SearchResults(searchResult: SearchResult) { - def apply[M: Manifest]()(implicit formats: Formats): List[M] = { - searchResult.getSourceAsStringList.asScala.map(source => serialization.read[M](source)).toList - } - } - - implicit class JestBulkAction(bulkableAction: BulkableAction[DocumentResult]) { - def index: String = bulkableAction.getIndex - } -} diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientCompanion.scala b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientCompanion.scala deleted file mode 100644 index 37278153..00000000 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientCompanion.scala +++ /dev/null @@ -1,159 +0,0 @@ -package app.softnetwork.elastic.client.jest - -import app.softnetwork.elastic.client.{ElasticConfig, ElasticCredentials} -import com.sksamuel.exts.Logging -import io.searchbox.action.Action -import io.searchbox.client.config.HttpClientConfig -import io.searchbox.client.{JestClient, JestClientFactory, JestResult, JestResultHandler} -import org.apache.http.HttpHost - -import java.io.IOException -import java.util -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.language.reflectiveCalls -import scala.util.{Failure, Success, Try} - -/** Created by smanciot on 20/05/2021. - */ -trait JestClientCompanion extends Logging { - - def elasticConfig: ElasticConfig - - private[this] var jestClient: Option[InnerJestClient] = None - - private[this] val factory = new JestClientFactory() - - private[this] var httpClientConfig: HttpClientConfig = _ - - private[this] class InnerJestClient(private var _jestClient: JestClient) extends JestClient { - private[this] var nbFailures: Int = 0 - - override def shutdownClient(): Unit = { - close() - } - - private def checkClient(): Unit = { - Option(_jestClient) match { - case None => - factory.setHttpClientConfig(httpClientConfig) - _jestClient = Try(factory.getObject) match { - case Success(s) => - s - case Failure(f) => - logger.error(f.getMessage, f) - throw f - } - case _ => - } - } - - override def executeAsync[J <: JestResult]( - clientRequest: Action[J], - jestResultHandler: JestResultHandler[_ >: J] - ): Unit = { - Try(checkClient()) - Option(_jestClient) match { - case Some(s) => s.executeAsync[J](clientRequest, jestResultHandler) - case _ => - close() - jestResultHandler.failed(new Exception("JestClient not initialized")) - } - } - - override def execute[J <: JestResult](clientRequest: Action[J]): J = { - Try(checkClient()) - Option(_jestClient) match { - case Some(j) => - Try(j.execute[J](clientRequest)) match { - case Success(s) => - nbFailures = 0 - s - case Failure(f) => - f match { - case e: IOException => - nbFailures += 1 - logger.error(e.getMessage, e) - close() - if (nbFailures < 10) { - Thread.sleep(1000 * nbFailures) - execute(clientRequest) - } else { - throw f - } - case e: IllegalStateException => - nbFailures += 1 - logger.error(e.getMessage, e) - close() - if (nbFailures < 10) { - Thread.sleep(1000 * nbFailures) - execute(clientRequest) - } else { - throw f - } - case _ => - close() - throw f - } - } - case _ => - close() - throw new Exception("JestClient not initialized") - } - } - - override def setServers(servers: util.Set[String]): Unit = { - Try(checkClient()) - Option(_jestClient).foreach(_.setServers(servers)) - } - - override def close(): Unit = { - Option(_jestClient).foreach(_.close()) - _jestClient = null - } - } - - private[this] def getHttpHosts(esUrl: String): Set[HttpHost] = { - esUrl - .split(",") - .map(u => { - val url = new java.net.URL(u) - new HttpHost(url.getHost, url.getPort, url.getProtocol) - }) - .toSet - } - - def apply(): JestClient = { - apply( - elasticConfig.credentials, - multithreaded = elasticConfig.multithreaded, - discoveryEnabled = elasticConfig.discoveryEnabled - ) - } - - def apply( - esCredentials: ElasticCredentials, - multithreaded: Boolean = true, - timeout: Int = 60000, - discoveryEnabled: Boolean = false, - discoveryFrequency: Long = 60L, - discoveryFrequencyTimeUnit: TimeUnit = TimeUnit.SECONDS - ): JestClient = { - jestClient match { - case Some(s) => s - case None => - httpClientConfig = new HttpClientConfig.Builder(esCredentials.url) - .defaultCredentials(esCredentials.username, esCredentials.password) - .preemptiveAuthTargetHosts(getHttpHosts(esCredentials.url).asJava) - .multiThreaded(multithreaded) - .discoveryEnabled(discoveryEnabled) - .discoveryFrequency(discoveryFrequency, discoveryFrequencyTimeUnit) - .connTimeout(timeout) - .readTimeout(timeout) - .build() - factory.setHttpClientConfig(httpClientConfig) - jestClient = Some(new InnerJestClient(factory.getObject)) - jestClient.get - } - } -} diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientResultHandler.scala b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientResultHandler.scala deleted file mode 100644 index fc05e2e2..00000000 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestClientResultHandler.scala +++ /dev/null @@ -1,44 +0,0 @@ -package app.softnetwork.elastic.client.jest - -import io.searchbox.action.Action -import io.searchbox.client.{JestClient, JestResult, JestResultHandler} -import io.searchbox.core.BulkResult - -import scala.concurrent.{Future, Promise} - -/** Created by smanciot on 28/04/17. - */ -private class JestClientResultHandler[T <: JestResult] extends JestResultHandler[T] { - - protected val promise: Promise[T] = Promise() - - override def completed(result: T): Unit = - if (!result.isSucceeded) - promise.failure(new Exception(s"${result.getErrorMessage} - ${result.getJsonString}")) - else { - result match { - case r: BulkResult if !r.getFailedItems.isEmpty => - promise.failure( - new Exception(s"We don't allow any failed item while indexing ${result.getJsonString}") - ) - case _ => promise.success(result) - - } - } - - override def failed(exception: Exception): Unit = promise.failure(exception) - - def future: Future[T] = promise.future - -} - -object JestClientResultHandler { - - implicit class PromiseJestClient(jestClient: JestClient) { - def executeAsyncPromise[T <: JestResult](clientRequest: Action[T]): Future[T] = { - val resultHandler = new JestClientResultHandler[T]() - jestClient.executeAsync(clientRequest, resultHandler) - resultHandler.future - } - } -} diff --git a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala b/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala deleted file mode 100644 index 19e12c0a..00000000 --- a/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestProvider.scala +++ /dev/null @@ -1,11 +0,0 @@ -package app.softnetwork.elastic.client.jest - -import app.softnetwork.elastic.persistence.query.ElasticProvider -import app.softnetwork.persistence.ManifestWrapper -import app.softnetwork.persistence.model.Timestamped - -/** Created by smanciot on 20/05/2021. - */ -trait JestProvider[T <: Timestamped] extends ElasticProvider[T] with JestClientApi { - _: ManifestWrapper[T] => -} diff --git a/jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala b/jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala deleted file mode 100644 index 21a628c2..00000000 --- a/jest/src/main/scala/app/softnetwork/elastic/persistence/query/State2ElasticProcessorStreamWithJestProvider.scala +++ /dev/null @@ -1,10 +0,0 @@ -package app.softnetwork.elastic.persistence.query - -import app.softnetwork.elastic.client.jest.JestProvider -import app.softnetwork.persistence.message.CrudEvent -import app.softnetwork.persistence.model.Timestamped -import app.softnetwork.persistence.query.{JournalProvider, OffsetProvider} - -trait State2ElasticProcessorStreamWithJestProvider[T <: Timestamped, E <: CrudEvent] - extends State2ElasticProcessorStream[T, E] - with JestProvider[T] { _: JournalProvider with OffsetProvider => } diff --git a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala index 6bfadc74..97245c93 100644 --- a/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala +++ b/persistence/src/main/scala/app/softnetwork/elastic/persistence/query/ElasticProvider.scala @@ -9,14 +9,14 @@ import app.softnetwork.persistence.model.Timestamped import app.softnetwork.persistence.query.ExternalPersistenceProvider import app.softnetwork.serialization.commonFormats import app.softnetwork.elastic.persistence.typed.Elastic._ -import com.sksamuel.exts.Logging +import com.typesafe.scalalogging.StrictLogging import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} /** Created by smanciot on 16/05/2020. */ -trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] with Logging { +trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] with StrictLogging { _: ElasticClientApi with ManifestWrapper[T] => implicit def formats: Formats = commonFormats @@ -44,7 +44,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w Try { createIndex(index) addAlias(index, alias) - setMapping(index, _type, loadMapping(mappingPath)) + setMapping(index, loadMapping(mappingPath)) } match { case Success(_) => logger.info(s"index:$index type:${_type} alias:$alias created") case Failure(f) => @@ -102,7 +102,7 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w */ override def deleteDocument(uuid: String): Boolean = { Try( - delete(uuid, index, _type) + delete(uuid, index) ) match { case Success(value) => value && refresh(index) case Failure(f) => @@ -121,13 +121,12 @@ trait ElasticProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] w * whether the operation is successful or not */ override def upsertDocument(uuid: String, data: String): Boolean = { - if (logger.isDebugEnabled) { + logger.whenDebugEnabled { logger.debug(s"Upserting document $uuid for index $index with $data") } Try( update( index, - _type, uuid, data, upsert = true diff --git a/project/Versions.scala b/project/Versions.scala index ba7fc73b..053f4293 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -8,7 +8,7 @@ object Versions { val kxbmap = "0.4.4" - val jackson = "2.12.7" // 2.11.4 -> 2.12.7 + val jackson = "2.13.3" // 2.11.4 -> 2.12.7 val json4s = "4.0.6" // 3.6.12 -> 4.0.6 @@ -20,9 +20,9 @@ object Versions { val log4s = "1.8.2" - val elasticSearch = "6.7.2" + val elasticSearch = "7.17.29" - val elastic4s = "6.7.6" + val elastic4s = "7.17.4" val jest = "6.3.1" @@ -34,5 +34,5 @@ object Versions { val gson = "2.8.0" - val rest = "6.7.2" // rest high level client + val rest = "7.17.29" // rest high level client } diff --git a/rest/build.sbt b/rest/build.sbt index e40a8c77..834128e5 100644 --- a/rest/build.sbt +++ b/rest/build.sbt @@ -4,6 +4,9 @@ name := "elastic-rest-client" val jacksonExclusions = Seq( ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), ExclusionRule(organization = "org.codehaus.jackson") ) diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala index eb08e448..47a2349f 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala +++ b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala @@ -11,7 +11,6 @@ import app.softnetwork.serialization.serialization import com.google.gson.JsonParser import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions -import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.action.admin.indices.flush.FlushRequest import org.elasticsearch.action.admin.indices.open.OpenIndexRequest @@ -27,18 +26,18 @@ import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse} import org.elasticsearch.action.{ActionListener, DocWriteRequest} import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.core.{CountRequest, CountResponse} -import org.elasticsearch.client.indices.{CreateIndexRequest, GetMappingsRequest, PutMappingRequest} +import org.elasticsearch.client.indices.{ + CloseIndexRequest, + CreateIndexRequest, + GetMappingsRequest, + PutMappingRequest +} import org.elasticsearch.common.io.stream.InputStreamStreamInput -import org.elasticsearch.common.xcontent.{DeprecationHandler, XContentType} +import org.elasticsearch.xcontent.{DeprecationHandler, XContentType} import org.elasticsearch.rest.RestStatus import org.elasticsearch.search.aggregations.bucket.filter.Filter import org.elasticsearch.search.aggregations.bucket.nested.Nested -import org.elasticsearch.search.aggregations.metrics.avg.Avg -import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality -import org.elasticsearch.search.aggregations.metrics.max.Max -import org.elasticsearch.search.aggregations.metrics.min.Min -import org.elasticsearch.search.aggregations.metrics.sum.Sum -import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount +import org.elasticsearch.search.aggregations.metrics.{Avg, Cardinality, Max, Min, Sum, ValueCount} import org.elasticsearch.search.builder.SearchSourceBuilder import org.json4s.Formats @@ -148,7 +147,7 @@ trait RestHighLevelClientSettingsApi extends SettingsApi with RestHighLevelClien } trait RestHighLevelClientMappingApi extends MappingApi with RestHighLevelClientCompanion { - override def setMapping(index: String, `type`: String, mapping: String): Boolean = { + override def setMapping(index: String, mapping: String): Boolean = { apply() .indices() .putMapping( @@ -159,7 +158,7 @@ trait RestHighLevelClientMappingApi extends MappingApi with RestHighLevelClientC .isAcknowledged } - override def getMapping(index: String, `type`: String): String = { + override def getMapping(index: String): String = { apply() .indices() .getMapping( @@ -340,10 +339,11 @@ trait RestHighLevelClientSingleValueAggregateApi trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientCompanion { _: RestHighLevelClientRefreshApi => - override def index(index: String, _type: String, id: String, source: String): Boolean = { + override def index(index: String, id: String, source: String): Boolean = { apply() .index( - new IndexRequest(index, _type, id) + new IndexRequest(index) + .id(id) .source(source, XContentType.JSON), RequestOptions.DEFAULT ) @@ -351,12 +351,13 @@ trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientCompa .getStatus < 400 } - override def indexAsync(index: String, _type: String, id: String, source: String)(implicit + override def indexAsync(index: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().indexAsync( - new IndexRequest(index, _type, id) + new IndexRequest(index) + .id(id) .source(source, XContentType.JSON), RequestOptions.DEFAULT, new ActionListener[IndexResponse] { @@ -374,14 +375,13 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCom _: RestHighLevelClientRefreshApi => override def update( index: String, - _type: String, id: String, source: String, upsert: Boolean ): Boolean = { apply() .update( - new UpdateRequest(index, _type, id) + new UpdateRequest(index, id) .doc(source, XContentType.JSON) .docAsUpsert(upsert), RequestOptions.DEFAULT @@ -392,14 +392,13 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCom override def updateAsync( index: String, - _type: String, id: String, source: String, upsert: Boolean )(implicit ec: ExecutionContext): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().updateAsync( - new UpdateRequest(index, _type, id) + new UpdateRequest(index, id) .doc(source, XContentType.JSON) .docAsUpsert(upsert), RequestOptions.DEFAULT, @@ -417,22 +416,22 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientCom trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientCompanion { _: RestHighLevelClientRefreshApi => - override def delete(uuid: String, index: String, _type: String): Boolean = { + override def delete(uuid: String, index: String): Boolean = { apply() .delete( - new DeleteRequest(index, _type, uuid), + new DeleteRequest(index, uuid), RequestOptions.DEFAULT ) .status() .getStatus < 400 } - override def deleteAsync(uuid: String, index: String, _type: String)(implicit + override def deleteAsync(uuid: String, index: String)(implicit ec: ExecutionContext ): Future[Boolean] = { val promise: Promise[Boolean] = Promise() apply().deleteAsync( - new DeleteRequest(index, _type, uuid), + new DeleteRequest(index, uuid), RequestOptions.DEFAULT, new ActionListener[DeleteResponse] { override def onResponse(response: DeleteResponse): Unit = @@ -459,7 +458,6 @@ trait RestHighLevelClientGetApi extends GetApi with RestHighLevelClientCompanion m.runtimeClass.getSimpleName.toLowerCase ) ), - maybeType.getOrElse("_all"), id ), RequestOptions.DEFAULT @@ -509,7 +507,6 @@ trait RestHighLevelClientGetApi extends GetApi with RestHighLevelClientCompanion m.runtimeClass.getSimpleName.toLowerCase ) ), - maybeType.getOrElse("_all"), id ), RequestOptions.DEFAULT, @@ -551,7 +548,7 @@ trait RestHighLevelClientSearchApi extends SearchApi with RestHighLevelClientCom ), RequestOptions.DEFAULT ) - if (response.getHits.getTotalHits > 0) { + if (response.getHits.getTotalHits.value > 0) { response.getHits.getHits.toList.map { hit => logger.info(s"Deserializing hit: ${hit.getSourceAsString}") serialization.read[U](hit.getSourceAsString) @@ -601,7 +598,7 @@ trait RestHighLevelClientSearchApi extends SearchApi with RestHighLevelClientCom RequestOptions.DEFAULT, new ActionListener[SearchResponse] { override def onResponse(response: SearchResponse): Unit = { - if (response.getHits.getTotalHits > 0) { + if (response.getHits.getTotalHits.value > 0) { promise.success(response.getHits.getHits.toList.map { hit => serialization.read[U](hit.getSourceAsString) }) @@ -799,19 +796,19 @@ trait RestHighLevelClientBulkApi import bulkItem._ val request = action match { case BulkAction.UPDATE => - val r = new UpdateRequest(index, null, if (id.isEmpty) null else id.get) + val r = new UpdateRequest(index, if (id.isEmpty) null else id.get) .doc(body, XContentType.JSON) .docAsUpsert(true) - parent.foreach(r.parent) +// parent.foreach(r.parent) r case BulkAction.DELETE => val r = new DeleteRequest(index).id(id.getOrElse("_all")) - parent.foreach(r.parent) +// parent.foreach(r.parent) r case _ => val r = new IndexRequest(index).source(body, XContentType.JSON) id.foreach(r.id) - parent.foreach(r.parent) +// parent.foreach(r.parent) r } request @@ -839,7 +836,7 @@ trait RestHighLevelClientBulkApi Flow[Seq[A]] .named("bulk") .mapAsyncUnordered[R](parallelism) { items => - val request = new BulkRequest(bulkOptions.index, bulkOptions.documentType) + val request = new BulkRequest(bulkOptions.index) items.foreach(request.add) val promise: Promise[R] = Promise[R]() apply().bulkAsync( diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala index 1dd158a2..1ec39335 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala +++ b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientCompanion.scala @@ -1,18 +1,18 @@ package app.softnetwork.elastic.client.rest import app.softnetwork.elastic.client.ElasticConfig -import com.sksamuel.exts.Logging +import com.typesafe.scalalogging.StrictLogging import org.apache.http.HttpHost import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.elasticsearch.client.{RestClient, RestClientBuilder, RestHighLevelClient} import org.elasticsearch.common.settings.Settings -import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.xcontent.NamedXContentRegistry import org.elasticsearch.plugins.SearchPlugin import org.elasticsearch.search.SearchModule -trait RestHighLevelClientCompanion extends Logging { +trait RestHighLevelClientCompanion extends StrictLogging { def elasticConfig: ElasticConfig diff --git a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala index a455304e..9a5b40f5 100644 --- a/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala +++ b/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientProvider.scala @@ -4,7 +4,9 @@ import app.softnetwork.elastic.persistence.query.ElasticProvider import app.softnetwork.persistence.ManifestWrapper import app.softnetwork.persistence.model.Timestamped -trait RestHighLevelClientProvider [T <: Timestamped] extends ElasticProvider[T] with RestHighLevelClientApi { +trait RestHighLevelClientProvider[T <: Timestamped] + extends ElasticProvider[T] + with RestHighLevelClientApi { _: ManifestWrapper[T] => } diff --git a/sql/build.sbt b/sql/build.sbt index f837ed03..99199fd2 100644 --- a/sql/build.sbt +++ b/sql/build.sbt @@ -6,20 +6,27 @@ val jackson = Seq( "com.fasterxml.jackson.core" % "jackson-databind" % Versions.jackson, "com.fasterxml.jackson.core" % "jackson-core" % Versions.jackson, "com.fasterxml.jackson.core" % "jackson-annotations" % Versions.jackson, - "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % Versions.jackson + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % Versions.jackson, + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % Versions.jackson, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % Versions.jackson, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % Versions.jackson, + "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % Versions.jackson, + "com.fasterxml.jackson.module" % "jackson-module-parameter-names" % Versions.jackson, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson, ) -val elastic = Seq( - "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), - "com.sksamuel.elastic4s" %% "elastic4s-http" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), +val elastic4s = Seq( + "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), ) val scalatest = Seq( "org.scalatest" %% "scalatest" % Versions.scalatest % Test ) -libraryDependencies ++= jackson ++ elastic ++ scalatest ++ Seq( +libraryDependencies ++= jackson ++ elastic4s ++ scalatest ++ Seq( "javax.activation" % "activation" % "1.1.1" % Test -) +) :+ + "org.scala-lang" % "scala-reflect" % "2.12.18" :+ + "com.google.code.gson" % "gson" % Versions.gson % Test diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticAggregation.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticAggregation.scala index be97ae0b..5a5b4d48 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticAggregation.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticAggregation.scala @@ -1,6 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.searches.aggs.Aggregation +import com.sksamuel.elastic4s.requests.searches.aggs.Aggregation case class ElasticAggregation( aggName: String, diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticMultiSearchRequest.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticMultiSearchRequest.scala index fe553083..503c2323 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticMultiSearchRequest.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticMultiSearchRequest.scala @@ -1,7 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.http.search.MultiSearchBuilderFn -import com.sksamuel.elastic4s.searches.MultiSearchRequest +import com.sksamuel.elastic4s.requests.searches.{MultiSearchBuilderFn, MultiSearchRequest} case class ElasticMultiSearchRequest( requests: Seq[ElasticSearchRequest], diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticQuery.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticQuery.scala index 963ae83a..08c9cf32 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticQuery.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticQuery.scala @@ -1,6 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.searches.{MultiSearchRequest, SearchRequest} +import com.sksamuel.elastic4s.requests.searches.{MultiSearchRequest, SearchRequest} /** Created by smanciot on 27/06/2018. */ diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala index e9525bf9..2bd3d424 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/ElasticSearchRequest.scala @@ -1,7 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn -import com.sksamuel.elastic4s.searches.SearchRequest +import com.sksamuel.elastic4s.requests.searches.{SearchBodyBuilderFn, SearchRequest} case class ElasticSearchRequest( fields: Seq[SQLField], diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala index a9bf251d..a788fcba 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLImplicits.scala @@ -1,6 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.searches.queries.term.{BuildableTermsQuery, TermsQuery} +//import com.sksamuel.elastic4s.requests.searches.term.{BuildableTermsQuery, TermsQuery} import scala.util.matching.Regex @@ -39,8 +39,8 @@ object SQLImplicits { implicit def sqllikeToRegex(value: String): Regex = toRegex(value).r - implicit def BuildableTermsNoOp[T]: BuildableTermsQuery[T] = new BuildableTermsQuery[T] { - override def build(q: TermsQuery[T]): Any = null // not used by the http builders - } +// implicit def BuildableTermsNoOp[T]: BuildableTermsQuery[T] = new BuildableTermsQuery[T] { +// override def build(q: TermsQuery[T]): Any = null // not used by the http builders +// } } diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLMultiSearchRequest.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLMultiSearchRequest.scala index 6af211ec..6d33586e 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLMultiSearchRequest.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLMultiSearchRequest.scala @@ -1,6 +1,6 @@ package app.softnetwork.elastic.sql -import com.sksamuel.elastic4s.searches.MultiSearchRequest +import com.sksamuel.elastic4s.requests.searches.MultiSearchRequest case class SQLMultiSearchRequest(requests: Seq[SQLSearchRequest]) extends SQLToken { override def sql: String = s"${requests.map(_.sql).mkString(" union ")}" diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala index 845e246c..e60408fc 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLQuery.scala @@ -1,7 +1,7 @@ package app.softnetwork.elastic.sql import com.sksamuel.elastic4s.ElasticApi -import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn +import com.sksamuel.elastic4s.requests.searches.SearchBodyBuilderFn case class SQLQuery(query: String, score: Option[Double] = None) { import SQLImplicits._ diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala index 2b1bb842..39176b5e 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala @@ -1,8 +1,8 @@ package app.softnetwork.elastic.sql import com.sksamuel.elastic4s.ElasticApi.{matchAllQuery, search} -import com.sksamuel.elastic4s.searches.SearchRequest -import com.sksamuel.elastic4s.searches.sort.FieldSort +import com.sksamuel.elastic4s.requests.searches.SearchRequest +import com.sksamuel.elastic4s.requests.searches.sort.FieldSort case class SQLSearchRequest( select: SQLSelect = SQLSelect(), diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSelect.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSelect.scala index 68c83b14..b037384a 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSelect.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLSelect.scala @@ -11,7 +11,7 @@ import com.sksamuel.elastic4s.ElasticApi.{ sumAgg, valueCountAgg } -import com.sksamuel.elastic4s.searches.aggs.Aggregation +import com.sksamuel.elastic4s.requests.searches.aggs.Aggregation case object Select extends SQLExpr("select") with SQLRegex diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala index a766e48b..fa75292d 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala @@ -1,7 +1,7 @@ package app.softnetwork.elastic.sql import com.sksamuel.elastic4s.ElasticApi._ -import com.sksamuel.elastic4s.searches.queries.Query +import com.sksamuel.elastic4s.requests.searches.queries.Query import SQLImplicits._ case object Where extends SQLExpr("where") with SQLRegex diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala index 1817eeb2..dd0b4298 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala @@ -1,8 +1,7 @@ package app.softnetwork.elastic.sql import com.sksamuel.elastic4s.ElasticApi.matchAllQuery -import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn -import com.sksamuel.elastic4s.searches.SearchRequest +import com.sksamuel.elastic4s.requests.searches.{SearchBodyBuilderFn, SearchRequest} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala index 60a02531..09b12108 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala @@ -1,9 +1,12 @@ package app.softnetwork.elastic.sql import app.softnetwork.elastic.sql.Queries._ +import com.google.gson.{JsonArray, JsonObject, JsonParser, JsonPrimitive} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.collection.JavaConverters.asScalaIteratorConverter + /** Created by smanciot on 13/04/17. */ class SQLQuerySpec extends AnyFlatSpec with Matchers { @@ -419,7 +422,9 @@ class SQLQuerySpec extends AnyFlatSpec with Matchers { |limit 100""".stripMargin).search select.isDefined shouldBe true val result = select.get - result.query shouldBe + val query = result.query + val queryWithoutSource = query.substring(0, query.indexOf("_source") - 2) + "}" + queryWithoutSource shouldBe """{ | "query": { | "bool": { @@ -459,19 +464,28 @@ class SQLQuerySpec extends AnyFlatSpec with Matchers { | } | }, | "from": 0, - | "size": 100, - | "_source": { - | "includes": [ - | "profileId", - | "profile_ccm.email", - | "profile_ccm.city", - | "profile_ccm.firstName", - | "profile_ccm.lastName", - | "profile_ccm.postalCode", - | "profile_ccm.birthYear" - | ] - | } + | "size": 100 |}""".stripMargin.replaceAll("\\s+", "") + val includes = new JsonParser() + .parse(query.substring(query.indexOf("_source") + 9, query.length - 1)) + .asInstanceOf[JsonObject] + .get("includes") + .asInstanceOf[JsonArray] + .iterator() + .asScala + val sourceIncludes: Seq[String] = ( + for (i <- includes) yield i.asInstanceOf[JsonPrimitive].getAsString + ).toSeq + val expectedSourceIncludes = Seq( + "profileId", + "profile_ccm.email", + "profile_ccm.city", + "profile_ccm.firstName", + "profile_ccm.lastName", + "profile_ccm.postalCode", + "profile_ccm.birthYear" + ) + sourceIncludes should contain theSameElementsAs expectedSourceIncludes } it should "exclude fields from select" in { diff --git a/testkit/build.sbt b/testkit/build.sbt index 0c2cf606..f9bdd649 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -10,18 +10,14 @@ val jacksonExclusions = Seq( ) val elastic = Seq( - "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), - "com.sksamuel.elastic4s" %% "elastic4s-http" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), - "org.elasticsearch" % "elasticsearch" % Versions.elasticSearch exclude ("org.apache.logging.log4j", "log4j-api"), - "com.sksamuel.elastic4s" %% "elastic4s-testkit" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), - "com.sksamuel.elastic4s" %% "elastic4s-embedded" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), - "com.sksamuel.elastic4s" %% "elastic4s-http" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch"), + "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), + "org.elasticsearch" % "elasticsearch" % Versions.elasticSearch exclude ("org.apache.logging.log4j", "log4j-api") exclude("org.slf4j", "slf4j-api") excludeAll(jacksonExclusions:_*), + "com.sksamuel.elastic4s" %% "elastic4s-testkit" % Versions.elastic4s exclude ("org.elasticsearch", "elasticsearch") exclude("org.slf4j", "slf4j-api"), "org.apache.logging.log4j" % "log4j-api" % Versions.log4j, // "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.log4j, "org.apache.logging.log4j" % "log4j-core" % Versions.log4j, - "pl.allegro.tech" % "embedded-elasticsearch" % "2.10.0" excludeAll(jacksonExclusions:_*), "org.testcontainers" % "elasticsearch" % Versions.testContainers excludeAll(jacksonExclusions:_*) ) libraryDependencies ++= elastic :+ - "app.softnetwork.persistence" %% "persistence-core-testkit" % "0.6.2.1" \ No newline at end of file + "app.softnetwork.persistence" %% "persistence-core-testkit" % Versions.genericPersistence \ No newline at end of file diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala index fe288531..b51f24b4 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/MockElasticClientApi.scala @@ -31,7 +31,7 @@ trait MockElasticClientApi extends ElasticClientApi { override def createIndex(index: String, settings: String): Boolean = true - override def setMapping(index: String, _type: String, mapping: String): Boolean = true + override def setMapping(index: String, mapping: String): Boolean = true override def deleteIndex(index: String): Boolean = true @@ -97,10 +97,10 @@ trait MockElasticClientApi extends ElasticClientApi { Future.successful(true) } - override def index(index: String, _type: String, id: String, source: String): Boolean = + override def index(index: String, id: String, source: String): Boolean = throw new UnsupportedOperationException - override def indexAsync(index: String, _type: String, id: String, source: String)(implicit + override def indexAsync(index: String, id: String, source: String)(implicit ec: ExecutionContext ): Future[Boolean] = throw new UnsupportedOperationException @@ -127,7 +127,6 @@ trait MockElasticClientApi extends ElasticClientApi { override def update( index: String, - _type: String, id: String, source: String, upsert: Boolean @@ -138,13 +137,12 @@ trait MockElasticClientApi extends ElasticClientApi { override def updateAsync( index: String, - _type: String, id: String, source: String, upsert: Boolean )(implicit ec: ExecutionContext): Future[Boolean] = Future.successful(false) - override def delete(uuid: String, index: String, _type: String): Boolean = { + override def delete(uuid: String, index: String): Boolean = { if (elasticDocuments.get(uuid).isDefined) { elasticDocuments.delete(uuid) true @@ -153,10 +151,10 @@ trait MockElasticClientApi extends ElasticClientApi { } } - override def deleteAsync(uuid: String, index: String, _type: String)(implicit + override def deleteAsync(uuid: String, index: String)(implicit ec: ExecutionContext ): Future[Boolean] = { - Future.successful(delete(uuid, index, _type)) + Future.successful(delete(uuid, index)) } override def refresh(index: String): Boolean = true @@ -212,7 +210,7 @@ trait MockElasticClientApi extends ElasticClientApi { formats: Formats ): List[(U, List[I])] = List.empty - override def getMapping(index: String, _type: String): String = + override def getMapping(index: String): String = throw new UnsupportedOperationException override def aggregate(sqlQuery: SQLQuery)(implicit diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala index 3e7f36da..9e169c3d 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala @@ -2,8 +2,7 @@ package app.softnetwork.elastic.scalatest import org.scalatest.Suite import org.testcontainers.elasticsearch.ElasticsearchContainer - -import scala.util.{Failure, Success} +import org.testcontainers.utility.DockerImageName /** Created by smanciot on 28/06/2018. */ @@ -12,7 +11,9 @@ trait ElasticDockerTestKit extends ElasticTestKit { _: Suite => override lazy val elasticURL: String = s"http://${elasticContainer.getHttpHostAddress}" lazy val elasticContainer = new ElasticsearchContainer( - s"docker.elastic.co/elasticsearch/elasticsearch:$elasticVersion" + DockerImageName + .parse(s"elasticsearch:$elasticVersion") + .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch") ) override def start(): Unit = elasticContainer.start() diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala index bbc15244..e3e67334 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticTestKit.scala @@ -1,11 +1,13 @@ package app.softnetwork.elastic.scalatest import app.softnetwork.concurrent.scalatest.CompletionTestKit -import com.sksamuel.elastic4s.{IndexAndTypes, Indexes} -import com.sksamuel.elastic4s.http.index.admin.RefreshIndexResponse -import com.sksamuel.elastic4s.http.{ElasticClient, ElasticDsl, ElasticProperties} +import com.sksamuel.elastic4s.http.JavaClient +import com.sksamuel.elastic4s.requests.indexes.admin.RefreshIndexResponse +import com.sksamuel.elastic4s.{ElasticClient, ElasticDsl, Indexes} import com.typesafe.config.{Config, ConfigFactory} +import org.apache.http.HttpHost import org.elasticsearch.ResourceAlreadyExistsException +import org.elasticsearch.client.RestClient import org.elasticsearch.transport.RemoteTransportException import org.scalatest.{BeforeAndAfterAll, Suite} import org.scalatest.matchers.{MatchResult, Matcher} @@ -20,7 +22,7 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft def log: Logger - def elasticVersion: String = "6.7.2" + def elasticVersion: String = "7.17.28" def elasticURL: String @@ -41,7 +43,15 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft lazy val clusterName: String = s"test-${UUID.randomUUID()}" - lazy val elasticClient: ElasticClient = ElasticClient(ElasticProperties(elasticURL)) + lazy val elasticClient: ElasticClient = ElasticClient( + new JavaClient( + RestClient + .builder( + HttpHost.create(elasticURL) + ) + .build() + ) + ) def start(): Unit = () @@ -66,56 +76,56 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft // Rewriting methods from IndexMatchers in elastic4s with the ElasticClient def haveCount(expectedCount: Int): Matcher[String] = - (left: String) => { - elasticClient.execute(search(left).size(0)) complete () match { + (index: String) => { + elasticClient.execute(search(index).size(0)) complete () match { case Success(s) => val count = s.result.totalHits MatchResult( count == expectedCount, - s"Index $left had count $count but expected $expectedCount", - s"Index $left had document count $expectedCount" + s"Index $index had count $count but expected $expectedCount", + s"Index $index had document count $expectedCount" ) case Failure(f) => throw f } } def containDoc(expectedId: String): Matcher[String] = - (left: String) => { - elasticClient.execute(get(expectedId).from(left)) complete () match { + (index: String) => { + elasticClient.execute(get(index, expectedId)) complete () match { case Success(s) => val exists = s.result.exists MatchResult( exists, - s"Index $left did not contain expected document $expectedId", - s"Index $left contained document $expectedId" + s"Index $index did not contain expected document $expectedId", + s"Index $index contained document $expectedId" ) case Failure(f) => throw f } } def beCreated(): Matcher[String] = - (left: String) => { - elasticClient.execute(indexExists(left)) complete () match { + (index: String) => { + elasticClient.execute(indexExists(index)) complete () match { case Success(s) => val exists = s.result.isExists MatchResult( exists, - s"Index $left did not exist", - s"Index $left exists" + s"Index $index did not exist", + s"Index $index exists" ) case Failure(f) => throw f } } def beEmpty(): Matcher[String] = - (left: String) => { - elasticClient.execute(search(left).size(0)) complete () match { + (index: String) => { + elasticClient.execute(search(index).size(0)) complete () match { case Success(s) => val count = s.result.totalHits MatchResult( count == 0, - s"Index $left was not empty", - s"Index $left was empty" + s"Index $index was not empty", + s"Index $index was empty" ) case Failure(f) => throw f } @@ -208,7 +218,7 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft blockUntil(s"Expected to find document $id") { () => elasticClient .execute { - get(id).from(index / _type) + get(index, id) } complete () match { case Success(s) => s.result.exists case _ => false @@ -227,24 +237,13 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft } } - def blockUntilCount(expected: Long, indexAndTypes: IndexAndTypes): Unit = { - blockUntil(s"Expected count of $expected") { () => - elasticClient.execute { - searchWithType(indexAndTypes).matchAllQuery().size(0) - } complete () match { - case Success(s) => expected <= s.result.totalHits - case Failure(f) => throw f - } - } - } - /** Will block until the given index and optional types have at least the given number of * documents. */ def blockUntilCount(expected: Long, index: String, types: String*): Unit = { blockUntil(s"Expected count of $expected") { () => elasticClient.execute { - searchWithType(index / types).matchAllQuery().size(0) + search(index).matchAllQuery().size(0) } complete () match { case Success(s) => expected <= s.result.totalHits case Failure(f) => throw f @@ -256,7 +255,7 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft blockUntil(s"Expected count of $expected") { () => elasticClient .execute { - searchWithType(index / types).size(0) + search(index).size(0) } complete () match { case Success(s) => expected == s.result.totalHits case Failure(f) => throw f @@ -303,7 +302,7 @@ trait ElasticTestKit extends ElasticDsl with CompletionTestKit with BeforeAndAft blockUntil(s"Expected document $id to have version $version") { () => elasticClient .execute { - get(id).from(index / _type) + get(index, id) } complete () match { case Success(s) => s.result.version == version case Failure(f) => throw f diff --git a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/EmbeddedElasticTestKit.scala b/testkit/src/main/scala/app/softnetwork/elastic/scalatest/EmbeddedElasticTestKit.scala deleted file mode 100644 index d0343344..00000000 --- a/testkit/src/main/scala/app/softnetwork/elastic/scalatest/EmbeddedElasticTestKit.scala +++ /dev/null @@ -1,34 +0,0 @@ -package app.softnetwork.elastic.scalatest - -import org.scalatest.Suite -import pl.allegro.tech.embeddedelasticsearch.EmbeddedElastic -import pl.allegro.tech.embeddedelasticsearch.PopularProperties._ - -import java.net.ServerSocket -import java.util.concurrent.TimeUnit - -trait EmbeddedElasticTestKit extends ElasticTestKit { _: Suite => - - override lazy val elasticURL: String = s"http://127.0.0.1:${embeddedElastic.getHttpPort}" - - override def stop(): Unit = embeddedElastic.stop() - - private[this] def dynamicPort: Int = { - val socket = new ServerSocket(0) - val port = socket.getLocalPort - socket.close() - port - } - - private[this] val embeddedElastic: EmbeddedElastic = EmbeddedElastic - .builder() - .withElasticVersion(elasticVersion) - .withSetting(HTTP_PORT, dynamicPort) - .withSetting(CLUSTER_NAME, clusterName) - .withCleanInstallationDirectoryOnStop(true) - .withEsJavaOpts("-Xms128m -Xmx512m") - .withStartTimeout(2, TimeUnit.MINUTES) - .build() - .start() - -} diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index 17ff9524..b63e3494 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -4,24 +4,18 @@ import java.io.ByteArrayInputStream import java.util.concurrent.TimeUnit import java.util.UUID import akka.actor.ActorSystem -//import app.softnetwork.elastic.client.RestHighLevelClientProviders._ -import app.softnetwork.elastic.client.jest.JestClientCompanion import app.softnetwork.elastic.sql.SQLQuery import com.fasterxml.jackson.core.JsonParseException -import com.sksamuel.elastic4s.searches.queries.matches.MatchAllQuery -import io.searchbox.client.JestClient -import io.searchbox.indices.CreateIndex -import io.searchbox.indices.aliases.AliasExists -import io.searchbox.indices.mapping.PutMapping -import io.searchbox.indices.settings.GetSettings +import com.sksamuel.elastic4s.requests.searches.queries.matches.MatchAllQuery import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers import app.softnetwork.persistence._ import app.softnetwork.serialization._ import app.softnetwork.elastic.model._ import app.softnetwork.elastic.persistence.query.ElasticProvider -import app.softnetwork.elastic.scalatest.EmbeddedElasticTestKit +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit import app.softnetwork.persistence.person.model.Person +import com.typesafe.scalalogging.StrictLogging import org.json4s.Formats import org.slf4j.{Logger, LoggerFactory} @@ -32,7 +26,11 @@ import scala.util.{Failure, Success} /** Created by smanciot on 28/06/2018. */ -trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with Matchers { +trait ElasticClientSpec + extends AnyFlatSpecLike + with ElasticDockerTestKit + with Matchers + with StrictLogging { lazy val log: Logger = LoggerFactory getLogger getClass.getName @@ -46,13 +44,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with def sClient: ElasticProvider[Sample] with ElasticClientApi def bClient: ElasticProvider[Binary] with ElasticClientApi - lazy val jestClient: JestClient = { - val config = elasticConfig - new JestClientCompanion { - override def elasticConfig: ElasticConfig = ElasticConfig(config) - }.apply() - } - import scala.language.implicitConversions implicit def toSQLQuery(sqlQuery: String): SQLQuery = SQLQuery(sqlQuery) @@ -80,35 +71,36 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Adding an alias" should "work" in { pClient.addAlias("person", "person_alias") - val aliasExists = new AliasExists.Builder().build() - jestClient.execute(aliasExists).isSucceeded shouldBe true + doesAliasExists("person_alias") shouldBe true } - private def settings = - jestClient - .execute(new GetSettings.Builder().addIndex("person").build()) - .getJsonObject - .getAsJsonObject("person") - .getAsJsonObject("settings") + private def settings: Map[String, String] = { + elasticClient.execute { + getSettings("person") + } complete () match { + case Success(s) => s.result.settingsForIndex("person") + case Failure(f) => throw f + } + } "Toggle refresh" should "work" in { pClient.toggleRefresh("person", enable = false) - settings.getAsJsonObject("index").get("refresh_interval").getAsString shouldBe "-1" + settings.getOrElse("index.refresh_interval", "") shouldBe "-1" pClient.toggleRefresh("person", enable = true) - settings.getAsJsonObject("index").get("refresh_interval").getAsString shouldBe "1s" + settings.getOrElse("index.refresh_interval", "") shouldBe "1s" } "Updating number of replicas" should "work" in { pClient.setReplicas("person", 3) - settings.getAsJsonObject("index").get("number_of_replicas").getAsString shouldBe "3" + settings.getOrElse("index.number_of_replicas", "") shouldBe "3" pClient.setReplicas("person", 0) - settings.getAsJsonObject("index").get("number_of_replicas").getAsString shouldBe "0" + settings.getOrElse("index.number_of_replicas", "") shouldBe "0" } - val persons = List( + val persons: List[String] = List( """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21 12:00:00"} """, """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21 12:00:00"} """, """ { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09 21:00:00"} """ @@ -117,14 +109,13 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with private val personsWithUpsert = persons :+ """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09 21:00:00"} """ - val children = List( + val children: List[String] = List( """ { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09 21:00:00"} """, """ { "parentId": "A16", "name": "Josh Gumble", "birthDate": "1999-05-09 21:00:00"} """ ) "Bulk index valid json without id key and suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person1", "person", 2) - implicit val jclient: JestClient = jestClient val indices = pClient.bulk[String](persons.iterator, identity, None, None, None) indices should contain only "person1" @@ -148,16 +139,18 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with } "Bulk index valid json with an id key but no suffix key" should "work" in { - jestClient.execute(new CreateIndex.Builder("person2").build()) - val childMapping = new PutMapping.Builder( - "person2", - "child", - "{ \"child\" : { \"_parent\" : {\"type\": \"person\"}, \"properties\" : { \"name\" : {\"type\" : \"string\", \"index\" : \"not_analyzed\"} } } }" - ).build() - jestClient.execute(childMapping) + elasticClient.execute( + createIndex("person2").mapping( + properties( + objectField("child").copy(properties = Seq(textField("name").copy(index = Some(false)))) + ) + ) + ) complete () match { + case Success(_) => () + case Failure(f) => throw f + } implicit val bulkOptions: BulkOptions = BulkOptions("person2", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), None, None) refresh(indices) @@ -197,7 +190,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Bulk index valid json with an id key and a suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient.bulk[String](persons.iterator, identity, Some("uuid"), Some("birthDate"), None, None) refresh(indices) @@ -226,7 +218,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Bulk index invalid json with an id key and a suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person_error", "person", 1000) - implicit val jclient: JestClient = jestClient intercept[JsonParseException] { val invalidJson = persons :+ "fail" pClient.bulk[String](invalidJson.iterator, identity, None, None, None) @@ -235,7 +226,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Bulk upsert valid json with an id key but no suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person4", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) @@ -265,7 +255,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Bulk upsert valid json with an id key and a suffix key" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person5", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient.bulk[String]( personsWithUpsert.iterator, identity, @@ -300,7 +289,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Count" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person6", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) @@ -322,7 +310,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Search" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person7", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) @@ -351,7 +338,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Get all" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person8", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) @@ -371,7 +357,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with "Get" should "work" in { implicit val bulkOptions: BulkOptions = BulkOptions("person9", "person", 1000) - implicit val jclient: JestClient = jestClient val indices = pClient .bulk[String](personsWithUpsert.iterator, identity, Some("uuid"), None, None, Some(true)) @@ -391,7 +376,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with } "Index" should "work" in { - implicit val jclient: JestClient = jestClient val uuid = UUID.randomUUID().toString val sample = Sample(uuid) val result = sClient.index(sample) @@ -407,7 +391,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with } "Update" should "work" in { - implicit val jclient: JestClient = jestClient val uuid = UUID.randomUUID().toString val sample = Sample(uuid) val result = sClient.update(sample) @@ -423,13 +406,12 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with } "Delete" should "work" in { - implicit val jclient: JestClient = jestClient val uuid = UUID.randomUUID().toString val sample = Sample(uuid) val result = sClient.index(sample) result shouldBe true - val result2 = sClient.delete(sample.uuid, Some("sample"), Some("sample")) + val result2 = sClient.delete(sample.uuid, Some("sample")) result2 shouldBe true val result3 = sClient.get[Sample](uuid) @@ -437,7 +419,6 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with } "Index binary data" should "work" in { - implicit val jclient: JestClient = jestClient bClient.createIndex("binaries") shouldBe true val mapping = """{ @@ -461,7 +442,7 @@ trait ElasticClientSpec extends AnyFlatSpecLike with EmbeddedElasticTestKit with | } |} """.stripMargin - bClient.setMapping("binaries", "test", mapping) shouldBe true + bClient.setMapping("binaries", mapping) shouldBe true for (uuid <- Seq("png", "jpg", "pdf")) { val path = Paths.get(Thread.currentThread().getContextClassLoader.getResource(s"avatar.$uuid").getPath) diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/JestClientSpec.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/JestClientSpec.scala deleted file mode 100644 index c4641dbd..00000000 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/JestClientSpec.scala +++ /dev/null @@ -1,20 +0,0 @@ -package app.softnetwork.elastic.client - -import app.softnetwork.elastic.client.JestProviders.{BinaryProvider, PersonProvider, SampleProvider} -import app.softnetwork.elastic.model.{Binary, Sample} -import app.softnetwork.elastic.persistence.query.ElasticProvider -import app.softnetwork.persistence.person.model.Person - -class JestClientSpec extends ElasticClientSpec { - - lazy val pClient: ElasticProvider[Person] with ElasticClientApi = new PersonProvider( - elasticConfig - ) - lazy val sClient: ElasticProvider[Sample] with ElasticClientApi = new SampleProvider( - elasticConfig - ) - lazy val bClient: ElasticProvider[Binary] with ElasticClientApi = new BinaryProvider( - elasticConfig - ) - -} diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala deleted file mode 100644 index 5e7a58e2..00000000 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/JestProviders.scala +++ /dev/null @@ -1,38 +0,0 @@ -package app.softnetwork.elastic.client - -import app.softnetwork.elastic.client.jest.JestProvider -import app.softnetwork.elastic.model.{Binary, Sample} -import app.softnetwork.persistence.ManifestWrapper -import app.softnetwork.persistence.person.model.Person -import com.typesafe.config.Config -import io.searchbox.client.JestClient - -object JestProviders { - - class PersonProvider(es: Config) extends JestProvider[Person] with ManifestWrapper[Person] { - override protected val manifestWrapper: ManifestW = ManifestW() - - override lazy val config: Config = es - - implicit lazy val jestClient: JestClient = - apply(elasticConfig.credentials, elasticConfig.multithreaded) - } - - class SampleProvider(es: Config) extends JestProvider[Sample] with ManifestWrapper[Sample] { - override protected val manifestWrapper: ManifestW = ManifestW() - - override lazy val config: Config = es - - implicit lazy val jestClient: JestClient = - apply(elasticConfig.credentials, elasticConfig.multithreaded) - } - - class BinaryProvider(es: Config) extends JestProvider[Binary] with ManifestWrapper[Binary] { - override protected val manifestWrapper: ManifestW = ManifestW() - - override lazy val config: Config = es - - implicit lazy val jestClient: JestClient = - apply(elasticConfig.credentials, elasticConfig.multithreaded) - } -} diff --git a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala b/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala index caf42fa2..442985ca 100644 --- a/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala +++ b/testkit/src/test/scala/app/softnetwork/elastic/client/RestHighLevelProviders.scala @@ -9,7 +9,9 @@ import org.elasticsearch.client.RestHighLevelClient object RestHighLevelProviders { - class PersonProvider(es: Config) extends RestHighLevelClientProvider[Person] with ManifestWrapper[Person] { + class PersonProvider(es: Config) + extends RestHighLevelClientProvider[Person] + with ManifestWrapper[Person] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es @@ -17,7 +19,9 @@ object RestHighLevelProviders { implicit lazy val jestClient: RestHighLevelClient = apply() } - class SampleProvider(es: Config) extends RestHighLevelClientProvider[Sample] with ManifestWrapper[Sample] { + class SampleProvider(es: Config) + extends RestHighLevelClientProvider[Sample] + with ManifestWrapper[Sample] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es @@ -25,7 +29,9 @@ object RestHighLevelProviders { implicit lazy val jestClient: RestHighLevelClient = apply() } - class BinaryProvider(es: Config) extends RestHighLevelClientProvider[Binary] with ManifestWrapper[Binary] { + class BinaryProvider(es: Config) + extends RestHighLevelClientProvider[Binary] + with ManifestWrapper[Binary] { override protected val manifestWrapper: ManifestW = ManifestW() override lazy val config: Config = es