diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/api/IndexLifecycleManagementApi.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/api/IndexLifecycleManagementApi.scala index 4aac12da93..b567c76891 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/api/IndexLifecycleManagementApi.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/api/IndexLifecycleManagementApi.scala @@ -1,6 +1,7 @@ package com.sksamuel.elastic4s.api -import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.{StartIlmRequest, GetIlmStatusRequest, StopIlmRequest} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicy +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement._ trait IndexLifecycleManagementApi { def getIlmStatus: GetIlmStatusRequest = GetIlmStatusRequest() @@ -8,4 +9,13 @@ trait IndexLifecycleManagementApi { def startIlm(): StartIlmRequest = StartIlmRequest() def stopIlm(): StopIlmRequest = StopIlmRequest() + + def createIndexLifecyclePolicy(policy: IndexLifecyclePolicy): CreateLifecyclePolicyRequest = + CreateLifecyclePolicyRequest(policy) + + def getIndexLifecyclePolicy(policyName: String): GetIndexLifecyclePolicyRequest = + GetIndexLifecyclePolicyRequest(policyName) + + def deleteIndexLifecyclePolicy(policyName: String): DeleteIndexLifecyclePolicyRequest = + DeleteIndexLifecyclePolicyRequest(policyName) } diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyRequest.scala new file mode 100644 index 0000000000..d80afb19d4 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyRequest.scala @@ -0,0 +1,21 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +import com.sksamuel.elastic4s.ext.OptionImplicits.RichOptionImplicits +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicy + +import scala.concurrent.duration.Duration + +case class CreateLifecyclePolicyRequest( + policy: IndexLifecyclePolicy, + masterTimeout: Option[String] = None, + timeout: Option[String] = None +) { + def masterTimeout(timeout: Duration): CreateLifecyclePolicyRequest = + copy(masterTimeout = s"${timeout.toNanos}nanos".some) + def masterTimeout(timeout: String): CreateLifecyclePolicyRequest = copy(masterTimeout = timeout.some) + + def timeout(timeout: Duration): CreateLifecyclePolicyRequest = copy(timeout = s"${timeout.toNanos}nanos".some) + def timeout(timeout: String): CreateLifecyclePolicyRequest = copy(timeout = timeout.some) + + def policy(policy: IndexLifecyclePolicy): CreateLifecyclePolicyRequest = copy(policy = policy) +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyResponse.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyResponse.scala new file mode 100644 index 0000000000..23c96ae3eb --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/CreateLifecyclePolicyResponse.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +case class CreateLifecyclePolicyResponse(acknowledged: Boolean) diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyRequest.scala new file mode 100644 index 0000000000..db98581619 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyRequest.scala @@ -0,0 +1,17 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +import scala.concurrent.duration.Duration + +case class DeleteIndexLifecyclePolicyRequest( + policyName: String, + masterTimeout: Option[String] = None, + timeout: Option[String] = None +) { + def masterTimeout(timeout: Duration): DeleteIndexLifecyclePolicyRequest = + copy(masterTimeout = Some(s"${timeout.toNanos}nanos")) + def masterTimeout(timeout: String): DeleteIndexLifecyclePolicyRequest = + copy(masterTimeout = Some(timeout)) + + def timeout(timeout: Duration): DeleteIndexLifecyclePolicyRequest = copy(timeout = Some(s"${timeout.toNanos}nanos")) + def timeout(timeout: String): DeleteIndexLifecyclePolicyRequest = copy(timeout = Some(timeout)) +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyResponse.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyResponse.scala new file mode 100644 index 0000000000..c3394bdde2 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/DeleteIndexLifecyclePolicyResponse.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +case class DeleteIndexLifecyclePolicyResponse(acknowledged: Boolean) diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyRequest.scala new file mode 100644 index 0000000000..d81d5dd5f9 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyRequest.scala @@ -0,0 +1,16 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +import scala.concurrent.duration.Duration + +case class GetIndexLifecyclePolicyRequest( + policyName: String, + masterTimeout: Option[String] = None, + timeout: Option[String] = None +) { + def masterTimeout(timeout: Duration): GetIndexLifecyclePolicyRequest = + copy(masterTimeout = Some(s"${timeout.toNanos}nanos")) + def masterTimeout(timeout: String): GetIndexLifecyclePolicyRequest = copy(masterTimeout = Some(timeout)) + + def timeout(timeout: Duration): GetIndexLifecyclePolicyRequest = copy(timeout = Some(s"${timeout.toNanos}nanos")) + def timeout(timeout: String): GetIndexLifecyclePolicyRequest = copy(timeout = Some(timeout)) +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyResponse.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyResponse.scala new file mode 100644 index 0000000000..68f35ff564 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/GetIndexLifecyclePolicyResponse.scala @@ -0,0 +1,46 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.JsonNode +import com.sksamuel.elastic4s.JacksonSupport +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicy + +import scala.collection.JavaConverters._ + +case class GetIndexLifecyclePolicyResponse( + version: Int, + modifiedDate: Long, + policy: IndexLifecyclePolicy, + inUseBy: Option[InUseBy] +) + +object GetIndexLifecyclePolicyResponse { + def deserialize(node: JsonNode): GetIndexLifecyclePolicyResponse = { + val policyProperty = node.properties().iterator().next() + val policyNode = policyProperty.getValue + val policyName = policyProperty.getKey + val version = Option(policyNode.get("version")).map(_.asInt(1)).getOrElse(1) + val modifiedDate = Option(policyNode.get("modified_date")).map(_.asLong(0L)).getOrElse(0L) + val policy = IndexLifecyclePolicy.deserialize(policyNode.get("policy")) + val inUseBy = Option(policyNode.get("in_use_by")).map(InUseBy.deserialize) + GetIndexLifecyclePolicyResponse(version, modifiedDate, policy.copy(name = policyName), inUseBy) + } + +} + +case class InUseBy(indices: List[String], data_streams: List[String], composable_templates: List[String]) + +object InUseBy { + def deserialize(node: JsonNode): InUseBy = { + val indices = + Option(node.get("indices")).getOrElse(JacksonSupport.mapper.createObjectNode()) + .values().asScala.map(_.asText()).toList + val dataStreams = + Option(node.get("data_streams")).getOrElse(JacksonSupport.mapper.createObjectNode()) + .values().asScala.map(_.asText()).toList + val composableTemplates = + Option(node.get("composable_templates")).getOrElse(JacksonSupport.mapper.createObjectNode()) + .values().asScala.map(_.asText()).toList + InUseBy(indices, dataStreams, composableTemplates) + } +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicy.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicy.scala new file mode 100644 index 0000000000..35f30ee194 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicy.scala @@ -0,0 +1,49 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy + +import com.fasterxml.jackson.databind.JsonNode +import com.sksamuel.elastic4s.JacksonSupport +import com.sksamuel.elastic4s.json.{JsonValue, StringValue} + +import scala.collection.JavaConverters._ + +case class IndexLifecyclePolicy( + name: String, + phases: List[IndexLifecyclePolicyPhase], + meta: Map[String, JsonValue] +) { + def withPhases(addPhases: IndexLifecyclePolicyPhase*): IndexLifecyclePolicy = + copy(phases = addPhases.toList ::: phases) + def withMeta(meta: (String, JsonValue)*): IndexLifecyclePolicy = + copy(meta = this.meta ++ meta.toMap) + + override def equals(obj: Any): Boolean = obj match { + case that: IndexLifecyclePolicy => + this.name == that.name && + this.phases.sortBy(_.phaseName) == that.phases.sortBy(_.phaseName) && + this.meta == that.meta + case _ => false + } +} + +object IndexLifecyclePolicy { + def apply(name: String): IndexLifecyclePolicy = IndexLifecyclePolicy(name, Nil, Map.empty) + + def deserialize(node: JsonNode): IndexLifecyclePolicy = { + + val phases = Option(node.get("phases")) + .getOrElse(JacksonSupport.mapper.createObjectNode()) + .properties() + .asScala + .map(entry => IndexLifecyclePolicyPhase.deserialize(entry.getKey, entry.getValue)) + .toList + + val meta = Option(node.get("_meta")) + .getOrElse(JacksonSupport.mapper.createObjectNode()) + .properties() + .asScala + .map(entry => entry.getKey -> StringValue(entry.getValue.asText())) + .toMap + + IndexLifecyclePolicy("", phases, meta) + } +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyAction.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyAction.scala new file mode 100644 index 0000000000..a59602ca36 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyAction.scala @@ -0,0 +1,42 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy + +import com.fasterxml.jackson.databind.JsonNode +import com.sksamuel.elastic4s.json.{JsonValue, StringValue} + +import scala.collection.JavaConverters._ + +case class IndexLifecyclePolicyAction( + actionName: String, + settings: Map[String, JsonValue] +) { + + def withSettings(settings: (String, JsonValue)*): IndexLifecyclePolicyAction = + copy(settings = this.settings ++ settings.toMap) +} + +object IndexLifecyclePolicyAction { + def apply(actionName: String): IndexLifecyclePolicyAction = IndexLifecyclePolicyAction(actionName, Map.empty) + + def deserialize(actionName: String, node: JsonNode): IndexLifecyclePolicyAction = { + val settings = node.properties().asScala.map { entry => + entry.getKey -> StringValue(entry.getValue.asText()) + }.toMap + + IndexLifecyclePolicyAction(actionName, settings) + } + + val ForceMergeAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("forcemerge") + val DeleteAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("delete") + val AllocateAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("allocate") + val DownsampleAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("downsample") + val FreezeAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("freeze") + val MigrateAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("migrate") + val ReadonlyAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("readonly") + val RolloverAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("rollover") + val SetPriorityAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("set_priority") + val SearchableSnapshotAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("searchable_snapshot") + val ShrinkAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("shrink") + val UnfollowAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("unfollow") + val WaitForSnapshotAction: IndexLifecyclePolicyAction = IndexLifecyclePolicyAction("wait_for_snapshot") + +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyPhase.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyPhase.scala new file mode 100644 index 0000000000..e7b83b1731 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/policy/IndexLifecyclePolicyPhase.scala @@ -0,0 +1,52 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy + +import com.fasterxml.jackson.databind.JsonNode +import com.sksamuel.elastic4s.JacksonSupport +import com.sksamuel.elastic4s.json.{JsonValue, StringValue} + +import scala.collection.JavaConverters._ + +case class IndexLifecyclePolicyPhase( + phaseName: String, + settings: Map[String, JsonValue], + actions: List[IndexLifecyclePolicyAction] +) { + def withSettings(settings: (String, JsonValue)*): IndexLifecyclePolicyPhase = + copy(settings = this.settings ++ settings.toMap) + + def withActions(actions: IndexLifecyclePolicyAction*): IndexLifecyclePolicyPhase = + copy(actions = actions.toList ::: this.actions) + + override def equals(obj: Any): Boolean = obj match { + case that: IndexLifecyclePolicyPhase => + this.phaseName == that.phaseName && + this.settings == that.settings && + this.actions.sortBy(_.actionName) == that.actions.sortBy(_.actionName) + case _ => false + } +} + +object IndexLifecyclePolicyPhase { + def apply(phaseName: String): IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase(phaseName, Map.empty, Nil) + + def deserialize(name: String, node: JsonNode): IndexLifecyclePolicyPhase = { + val actions = Option(node.get("actions")) + .getOrElse(JacksonSupport.mapper.createObjectNode()) + .properties() + .asScala + .map(entry => IndexLifecyclePolicyAction.deserialize(entry.getKey, entry.getValue)) + .toList + + val settings = node.properties().asScala.filterNot(_.getKey == "actions").map { entry => + entry.getKey -> StringValue(entry.getValue.asText()) + }.toMap + + IndexLifecyclePolicyPhase(name, settings, actions) + } + + val DeletePhase: IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase("delete") + val WarmPhase: IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase("warm") + val ColdPhase: IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase("cold") + val FrozenPhase: IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase("frozen") + val HotPhase: IndexLifecyclePolicyPhase = IndexLifecyclePolicyPhase("hot") +} diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyActionContentBuilder.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyActionContentBuilder.scala new file mode 100644 index 0000000000..e2502a0f40 --- /dev/null +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyActionContentBuilder.scala @@ -0,0 +1,12 @@ +package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicyAction + +object ElasticPolicyActionContentBuilder { + def apply(action: IndexLifecyclePolicyAction): XContentBuilder = { + val builder = XContentFactory.jsonBuilder() + action.settings.foreach { case (name, value) => builder.field(name, value) } + builder + } +} diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyPhaseContentBuilder.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyPhaseContentBuilder.scala new file mode 100644 index 0000000000..1a7f165d89 --- /dev/null +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/ElasticPolicyPhaseContentBuilder.scala @@ -0,0 +1,15 @@ +package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicyPhase + +object ElasticPolicyPhaseContentBuilder { + def apply(phase: IndexLifecyclePolicyPhase): XContentBuilder = { + val builder = XContentFactory.jsonBuilder() + phase.settings.foreach { case (name, value) => builder.field(name, value) } + builder.startObject("actions") + phase.actions.map(action => builder.rawField(action.actionName, ElasticPolicyActionContentBuilder(action))) + builder.endObject() + builder.endObject() + } +} diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagementHandlers.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagementHandlers.scala new file mode 100644 index 0000000000..2e8fb0db21 --- /dev/null +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagementHandlers.scala @@ -0,0 +1,143 @@ +package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement + +import com.sksamuel.elastic4s.HttpEntity.ByteArrayEntity +import com.sksamuel.elastic4s.handlers.ElasticErrorParser +import com.sksamuel.elastic4s._ +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement._ + +import java.nio.charset.StandardCharsets +import scala.util.Try + +trait IndexLifecycleManagementHandlers { + implicit object IndexLifecycleStatusHandler extends Handler[GetIlmStatusRequest, GetIlmStatusResponse] { + override def responseHandler: ResponseHandler[GetIlmStatusResponse] = (response: HttpResponse) => { + response.statusCode match { + case 200 | 201 => Right(ResponseHandler.fromResponse[GetIlmStatusResponse](response)) + case 400 => Left(ElasticErrorParser.parse(response)) + case _ => sys.error("Invalid response") + } + } + + override def build(request: GetIlmStatusRequest): ElasticRequest = { + val endpoint = "/_ilm/status" + + ElasticRequest("GET", endpoint) + } + } + + implicit object IndexLifecycleStartHandler extends Handler[StartIlmRequest, StartIlmResponse] { + override def responseHandler: ResponseHandler[StartIlmResponse] = (response: HttpResponse) => { + response.statusCode match { + case 200 | 201 => Right(ResponseHandler.fromResponse[StartIlmResponse](response)) + case 400 => Left(ElasticErrorParser.parse(response)) + case _ => sys.error("Invalid response") + } + } + + override def build(request: StartIlmRequest): ElasticRequest = { + val endpoint = "/_ilm/start" + + val params = scala.collection.mutable.Map.empty[String, String] + request.masterTimeout.foreach(params.put("master_timeout", _)) + request.timeout.foreach(params.put("timeout", _)) + + ElasticRequest("POST", endpoint, params.toMap) + } + } + + implicit object IndexLifecycleStopHandler extends Handler[StopIlmRequest, StopIlmResponse] { + override def responseHandler: ResponseHandler[StopIlmResponse] = (response: HttpResponse) => { + response.statusCode match { + case 200 | 201 => Right(ResponseHandler.fromResponse[StopIlmResponse](response)) + case 400 => Left(ElasticErrorParser.parse(response)) + case _ => sys.error("Invalid response") + } + } + + override def build(request: StopIlmRequest): ElasticRequest = { + val endpoint = "/_ilm/stop" + + val params = scala.collection.mutable.Map.empty[String, String] + request.masterTimeout.foreach(params.put("master_timeout", _)) + request.timeout.foreach(params.put("timeout", _)) + + ElasticRequest("POST", endpoint, params.toMap) + } + } + + private val policyPath = (policyName: String) => s"/_ilm/policy/$policyName" + + implicit object CreateElasticPolicyHandler + extends Handler[CreateLifecyclePolicyRequest, CreateLifecyclePolicyResponse] { + + override def build(request: CreateLifecyclePolicyRequest): ElasticRequest = { + val endpoint = policyPath(request.policy.name) + val params = List( + request.masterTimeout.map("master_timeout" -> _), + request.timeout.map("timeout" -> _) + ).flatten.toMap + + val body = IndexLifecyclePolicyContentBuilder(request.policy).string + val entity = ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8), Some("application/json")) + + ElasticRequest("PUT", endpoint, params, entity) + } + } + + implicit object GetElasticPolicyHandler + extends Handler[GetIndexLifecyclePolicyRequest, Option[GetIndexLifecyclePolicyResponse]] { + + override def build(request: GetIndexLifecyclePolicyRequest): ElasticRequest = { + val endpoint = policyPath(request.policyName) + val params = List( + request.masterTimeout.map("master_timeout" -> _), + request.timeout.map("timeout" -> _) + ).flatten.toMap + + ElasticRequest("GET", endpoint, params) + } + + override def responseHandler: ResponseHandler[Option[GetIndexLifecyclePolicyResponse]] = + (response: HttpResponse) => { + response.statusCode match { + case 200 | 201 => + response.entity.map(_.content).map { responseString => + Try(JacksonSupport.mapper.readTree(responseString)) + .flatMap { node => Try(GetIndexLifecyclePolicyResponse.deserialize(node)) } + .fold( + err => Left(ElasticError.fromThrowable(err)), + res => Right(Some(res)) + ) + }.getOrElse(Right(None)) + case 404 => + Right(None) + case _ => + Left(ElasticErrorParser.parse(response)) + } + } + } + + implicit object DeleteIndexLifecyclePolicyHandler + extends Handler[DeleteIndexLifecyclePolicyRequest, DeleteIndexLifecyclePolicyResponse] { + + override def build(request: DeleteIndexLifecyclePolicyRequest): ElasticRequest = { + val endpoint = policyPath(request.policyName) + val params = List( + request.masterTimeout.map("master_timeout" -> _), + request.timeout.map("timeout" -> _) + ).flatten.toMap + + ElasticRequest("DELETE", endpoint, params) + } + + override def responseHandler: ResponseHandler[DeleteIndexLifecyclePolicyResponse] = { (response: HttpResponse) => + super.responseHandler.handle(response) match { + case Left(error: ElasticError) if error.`type` == "resource_not_found_exception" => + Right(DeleteIndexLifecyclePolicyResponse(false)) + case Left(error) => Left(error) + case Right(response) => Right(response) + } + + } + } +} diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagmentHandlers.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagmentHandlers.scala deleted file mode 100644 index 3800464c13..0000000000 --- a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecycleManagmentHandlers.scala +++ /dev/null @@ -1,69 +0,0 @@ -package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement - -import com.sksamuel.elastic4s.handlers.ElasticErrorParser -import com.sksamuel.elastic4s._ -import com.sksamuel.elastic4s.requests.indexlifecyclemanagement._ - -trait IndexLifecycleManagementHandlers { - implicit object IndexLifecycleStatusHandler extends Handler[GetIlmStatusRequest, GetIlmStatusResponse] { - override def responseHandler: ResponseHandler[GetIlmStatusResponse] = new ResponseHandler[GetIlmStatusResponse] { - override def handle(response: HttpResponse): Either[ElasticError, GetIlmStatusResponse] = { - response.statusCode match { - case 200 | 201 => Right(ResponseHandler.fromResponse[GetIlmStatusResponse](response)) - case 400 => Left(ElasticErrorParser.parse(response)) - case _ => sys.error("Invalid response") - } - } - } - - override def build(request: GetIlmStatusRequest): ElasticRequest = { - val endpoint = "/_ilm/status" - - ElasticRequest("GET", endpoint) - } - } - - implicit object IndexLifecycleStartHandler extends Handler[StartIlmRequest, StartIlmResponse] { - override def responseHandler: ResponseHandler[StartIlmResponse] = new ResponseHandler[StartIlmResponse] { - override def handle(response: HttpResponse): Either[ElasticError, StartIlmResponse] = { - response.statusCode match { - case 200 | 201 => Right(ResponseHandler.fromResponse[StartIlmResponse](response)) - case 400 => Left(ElasticErrorParser.parse(response)) - case _ => sys.error("Invalid response") - } - } - } - - override def build(request: StartIlmRequest): ElasticRequest = { - val endpoint = "/_ilm/start" - - val params = scala.collection.mutable.Map.empty[String, String] - request.masterTimeout.foreach(params.put("master_timeout", _)) - request.timeout.foreach(params.put("timeout", _)) - - ElasticRequest("POST", endpoint, params.toMap) - } - } - - implicit object IndexLifecycleStopHandler extends Handler[StopIlmRequest, StopIlmResponse] { - override def responseHandler: ResponseHandler[StopIlmResponse] = new ResponseHandler[StopIlmResponse] { - override def handle(response: HttpResponse): Either[ElasticError, StopIlmResponse] = { - response.statusCode match { - case 200 | 201 => Right(ResponseHandler.fromResponse[StopIlmResponse](response)) - case 400 => Left(ElasticErrorParser.parse(response)) - case _ => sys.error("Invalid response") - } - } - } - - override def build(request: StopIlmRequest): ElasticRequest = { - val endpoint = "/_ilm/stop" - - val params = scala.collection.mutable.Map.empty[String, String] - request.masterTimeout.foreach(params.put("master_timeout", _)) - request.timeout.foreach(params.put("timeout", _)) - - ElasticRequest("POST", endpoint, params.toMap) - } - } -} diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecyclePolicyContentBuilder.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecyclePolicyContentBuilder.scala new file mode 100644 index 0000000000..886880736c --- /dev/null +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/indexlifecyclemanagement/IndexLifecyclePolicyContentBuilder.scala @@ -0,0 +1,20 @@ +package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicy + +object IndexLifecyclePolicyContentBuilder { + def apply(policy: IndexLifecyclePolicy): XContentBuilder = { + val builder = XContentFactory.jsonBuilder() + builder.startObject("policy") + if (policy.meta.nonEmpty) { + builder.startObject("_meta") + policy.meta.foreach { case (k, v) => builder.field(k, v) } + builder.endObject() + } + builder.startObject("phases") + policy.phases.foreach(phase => builder.rawField(phase.phaseName, ElasticPolicyPhaseContentBuilder(phase))) + builder.endObject() + builder.endObject() + } +} diff --git a/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_create.json b/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_create.json new file mode 100644 index 0000000000..86e506187a --- /dev/null +++ b/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_create.json @@ -0,0 +1,27 @@ +{ + "policy": { + "_meta": { + "description": "used for nginx log", + "project": { + "name": "myProject", + "department": "myDepartment" + } + }, + "phases": { + "warm": { + "min_age": "10d", + "actions": { + "forcemerge": { + "max_num_segments": 1 + } + } + }, + "delete": { + "min_age": "30d", + "actions": { + "delete": {} + } + } + } + } +} diff --git a/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_get.json b/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_get.json new file mode 100644 index 0000000000..d3d5c22a31 --- /dev/null +++ b/elastic4s-tests/src/test/resources/json/indexlifecyclemanagement/test_policy_get.json @@ -0,0 +1,31 @@ +{ + "my_policy": { + "version": 1, + "modified_date": 82392349, + "policy": { + "phases": { + "warm": { + "min_age": "10d", + "actions": { + "forcemerge": { + "max_num_segments": 1 + } + } + }, + "delete": { + "min_age": "30d", + "actions": { + "delete": { + "delete_searchable_snapshot": true + } + } + } + } + }, + "in_use_by" : { + "indices" : [], + "data_streams" : [], + "composable_templates" : [] + } + } +} diff --git a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyApiTest.scala b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyApiTest.scala new file mode 100644 index 0000000000..a921306a6c --- /dev/null +++ b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyApiTest.scala @@ -0,0 +1,96 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement.politics + +import com.sksamuel.elastic4s.ElasticDsl +import com.sksamuel.elastic4s.json.{RawValue, StringValue} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.{ + IndexLifecyclePolicy, + IndexLifecyclePolicyAction +} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicyPhase.DeletePhase +import com.sksamuel.elastic4s.testkit.DockerTests +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class IndexLifecyclePolicyApiTest extends AnyFlatSpec with Matchers with ElasticDsl with DockerTests with Eventually + with BeforeAndAfterAll { + "index lifecycle policy delete" should "be success on empty elasticsearch" in { + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await.result.acknowledged shouldBe false + } + + "index lifecycle policy" should "be created" in { + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await.result + + val policy = IndexLifecyclePolicy("my_policy") + .withPhases( + DeletePhase + .withSettings("min_age" -> StringValue("30d")) + .withActions(IndexLifecyclePolicyAction.DeleteAction) + ) + + val result = + client.execute( + createIndexLifecyclePolicy(policy) + ).await.result + + result.acknowledged shouldBe true + } + + "index lifecycle policy" should "be found" in { + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await + + val policy = IndexLifecyclePolicy("my_policy") + .withPhases( + DeletePhase + .withSettings("min_age" -> StringValue("30d")) + .withActions( + IndexLifecyclePolicyAction.DeleteAction + .withSettings("delete_searchable_snapshot" -> StringValue("true")) + ) + ) + + client.execute( + createIndexLifecyclePolicy(policy) + ).await + + val policyFromElastic = client.execute( + getIndexLifecyclePolicy("my_policy") + ).await.result + + policyFromElastic.map(_.policy) shouldBe Some(policy) + } + + "index lifecycle policy" should "be deleted" in { + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await + + val policy = IndexLifecyclePolicy("my_policy") + + client.execute( + createIndexLifecyclePolicy(policy) + ).await + + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await + + client.execute( + getIndexLifecyclePolicy("my_policy") + ).await.result shouldBe None + } + + override def afterAll(): Unit = { + client.execute( + deleteIndexLifecyclePolicy("my_policy") + ).await + } + +} diff --git a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyModelsTest.scala b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyModelsTest.scala new file mode 100644 index 0000000000..e2ff8f4c53 --- /dev/null +++ b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/indexlifecyclemanagement/politics/IndexLifecyclePolicyModelsTest.scala @@ -0,0 +1,81 @@ +package com.sksamuel.elastic4s.requests.indexlifecyclemanagement.politics + +import com.sksamuel.elastic4s.handlers.indexlifecyclemanagement.IndexLifecyclePolicyContentBuilder +import com.sksamuel.elastic4s.json.{IntValue, ObjectValue, RawValue, StringValue} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicy +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicyAction.{ + DeleteAction, + ForceMergeAction +} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.policy.IndexLifecyclePolicyPhase.{ + DeletePhase, + WarmPhase +} +import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.{GetIndexLifecyclePolicyResponse, InUseBy} +import com.sksamuel.elastic4s.{JacksonSupport, JsonSugar} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar + +import scala.io.Source + +class IndexLifecyclePolicyModelsTest extends AnyFlatSpec with MockitoSugar with JsonSugar with Matchers { + "the create lifecycle policy dsl" should "generate the correct json" in { + + val policy = IndexLifecyclePolicy("testPolicy") + .withMeta( + "description" -> StringValue("used for nginx log"), + "project" -> + ObjectValue.empty + .putValue("name", StringValue("myProject")) + .putValue("department", StringValue("myDepartment")) + ) + .withPhases( + WarmPhase + .withSettings("min_age" -> StringValue("10d")) + .withActions( + ForceMergeAction.withSettings("max_num_segments" -> IntValue(1)) + ), + DeletePhase + .withSettings("min_age" -> StringValue("30d")) + .withActions( + DeleteAction + ) + ) + + IndexLifecyclePolicyContentBuilder(policy).string should matchJsonResource( + "/json/indexlifecyclemanagement/test_policy_create.json" + ) + } + + "the create lifecycle policy dsl" should "parse json correctly" in { + val resourceName = "/json/indexlifecyclemanagement/test_policy_get.json" + val patternValue = GetIndexLifecyclePolicyResponse( + version = 1, + modifiedDate = 82392349, + policy = IndexLifecyclePolicy("my_policy") + .withPhases( + WarmPhase + .withSettings("min_age" -> StringValue("10d")) + .withActions(ForceMergeAction.withSettings("max_num_segments" -> StringValue("1"))), + DeletePhase + .withSettings("min_age" -> StringValue("30d")) + .withActions(DeleteAction.withSettings("delete_searchable_snapshot" -> StringValue("true"))) + ), + inUseBy = Some(InUseBy(Nil, Nil, Nil)) + ) + + val jsonResourceStream = getClass.getResourceAsStream(resourceName) + withClue(s"expected JSON resource [$resourceName] ") { + jsonResourceStream should not be null + } + val source = Source.fromInputStream(jsonResourceStream) + val jsonReference = + try + source.mkString + finally + source.close() + + GetIndexLifecyclePolicyResponse.deserialize(JacksonSupport.mapper.readTree(jsonReference)) shouldBe patternValue + } +}