Skip to content

Commit b67f462

Browse files
committed
fix: resolve rollover issue with index and ism policy improvements
This commit addresses the issue where rollover was not resolving as intended, leading to a single index accumulating all documents without purging. BREAKING CHANGE: The index name has been changed from "actions" to "events", requiring manual deletion of the old "actions" index. Key changes: 1. The index configuration now uses 3 shards and 1 replica to distribute the load evenly across the cluster's 3 data nodes. The index name has been changed to `events` to enforce this setting. 2. An index template with a `rollover_alias` has been created to ensure rollover operations work correctly and don't hang. 3. Updated ISM policy: - Rollover now occurs daily, allowing for more granular purging after 14 days. - Introduced a warm state for indexes older than 7 days an action reduces the number of replicas to 0.
1 parent c15357a commit b67f462

File tree

15 files changed

+262
-68
lines changed

15 files changed

+262
-68
lines changed

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.springframework.data.elasticsearch.annotations.FieldType
2727
* @property data Additional data associated with the event.
2828
* @property session Session data associated with the event, potentially updated later.
2929
*/
30-
@Document(indexName = "actions", createIndex = false)
30+
@Document(indexName = "events", createIndex = false)
3131
data class EventRequest(
3232
@Id
3333
@JsonIgnore

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import org.springframework.core.io.ResourceLoader
99
import org.springframework.http.HttpHeaders
1010
import org.springframework.http.HttpStatusCode
1111
import org.springframework.http.MediaType
12-
import org.springframework.http.ResponseEntity
1312
import org.springframework.stereotype.Component
1413
import org.springframework.web.reactive.function.client.WebClient
1514
import reactor.core.publisher.Mono
@@ -24,7 +23,7 @@ import reactor.core.publisher.Mono
2423
* @property resourceLoader Resource loader used to access the alias configuration JSON file.
2524
*/
2625
@Component
27-
@Order(3)
26+
@Order(4)
2827
class AliasSetupTask(
2928
@Qualifier("openSearchWebClient")
3029
private val webClient: WebClient,
@@ -39,7 +38,7 @@ class AliasSetupTask(
3938
/**
4039
* Name of the filtered alias.
4140
*/
42-
private const val ALIAS_NAME = "filtered_actions"
41+
private const val ALIAS_NAME = "user_events"
4342

4443
/**
4544
* Path to check for the existence of the alias in OpenSearch.
@@ -61,7 +60,7 @@ class AliasSetupTask(
6160
*/
6261
override fun run(): Mono<*> = checkAndCreateAlias()
6362

64-
private fun checkAndCreateAlias(): Mono<ResponseEntity<Void>> =
63+
private fun checkAndCreateAlias(): Mono<*> =
6564
webClient
6665
.get()
6766
.uri(ALIAS_CHECK_PATH)
@@ -74,7 +73,7 @@ class AliasSetupTask(
7473
Mono.empty()
7574
}.toBodilessEntity()
7675

77-
private fun createAlias(): Mono<ResponseEntity<Void>> {
76+
private fun createAlias(): Mono<*> {
7877
val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/alias.json")
7978
return webClient
8079
.put()

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import org.springframework.core.io.ResourceLoader
99
import org.springframework.http.HttpHeaders
1010
import org.springframework.http.HttpStatusCode
1111
import org.springframework.http.MediaType
12-
import org.springframework.http.ResponseEntity
1312
import org.springframework.stereotype.Component
1413
import org.springframework.web.reactive.function.client.WebClient
1514
import reactor.core.publisher.Mono
@@ -39,7 +38,7 @@ class ISMPolicySetupTask(
3938
/**
4039
* Path for the Index State Management (ISM) policy in OpenSearch.
4140
*/
42-
private const val ISM_POLICY_PATH = "/_plugins/_ism/policies/actions_policy"
41+
private const val ISM_POLICY_PATH = "/_plugins/_ism/policies/events_policy"
4342
}
4443

4544
/**
@@ -51,7 +50,7 @@ class ISMPolicySetupTask(
5150
*/
5251
override fun run(): Mono<*> = checkAndApplyISMPolicy()
5352

54-
private fun checkAndApplyISMPolicy(): Mono<ResponseEntity<Void>> =
53+
private fun checkAndApplyISMPolicy(): Mono<*> =
5554
webClient
5655
.get()
5756
.uri(ISM_POLICY_PATH)
@@ -64,7 +63,7 @@ class ISMPolicySetupTask(
6463
Mono.empty()
6564
}.toBodilessEntity()
6665

67-
private fun applyISMPolicy(): Mono<ResponseEntity<Void>> {
66+
private fun applyISMPolicy(): Mono<*> {
6867
val ismPolicyJson = resourceLoader.loadResourceContent("classpath:opensearch/ism_policy.json")
6968

7069
return webClient

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import org.springframework.core.io.ResourceLoader
99
import org.springframework.http.HttpHeaders
1010
import org.springframework.http.HttpStatusCode
1111
import org.springframework.http.MediaType
12-
import org.springframework.http.ResponseEntity
1312
import org.springframework.stereotype.Component
1413
import org.springframework.web.reactive.function.client.WebClient
1514
import reactor.core.publisher.Mono
@@ -25,7 +24,7 @@ import reactor.core.publisher.Mono
2524
* @property resourceLoader Resource loader used to access the index template JSON file.
2625
*/
2726
@Component
28-
@Order(2)
27+
@Order(3)
2928
class IndexSetupTask(
3029
@Qualifier("openSearchWebClient")
3130
private val webClient: WebClient,
@@ -40,7 +39,12 @@ class IndexSetupTask(
4039
/**
4140
* Path for creating the OpenSearch index.
4241
*/
43-
private const val INDEX_CREATION_PATH = "/actions-000001"
42+
private const val INDEX_CREATION_PATH = "/events-000001"
43+
44+
/**
45+
* Path to check if the index exists.
46+
*/
47+
private const val INDEX_CHECK_PATH = "/events"
4448
}
4549

4650
/**
@@ -52,10 +56,10 @@ class IndexSetupTask(
5256
*/
5357
override fun run(): Mono<*> = checkAndCreateIndex()
5458

55-
private fun checkAndCreateIndex(): Mono<ResponseEntity<Void>> =
59+
private fun checkAndCreateIndex(): Mono<*> =
5660
webClient
5761
.head()
58-
.uri(INDEX_CREATION_PATH)
62+
.uri(INDEX_CHECK_PATH)
5963
.retrieve()
6064
.onStatus(HttpStatusCode::is4xxClientError) {
6165
logger.info("Index does not exist, creating index...")
@@ -65,13 +69,13 @@ class IndexSetupTask(
6569
Mono.empty()
6670
}.toBodilessEntity()
6771

68-
private fun createIndex(): Mono<ResponseEntity<Void>> {
69-
val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/index_template.json")
72+
private fun createIndex(): Mono<*> {
73+
val indexJson = resourceLoader.loadResourceContent("classpath:opensearch/index.json")
7074
return webClient
7175
.put()
7276
.uri(INDEX_CREATION_PATH)
7377
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
74-
.bodyValue(indexTemplateJson)
78+
.bodyValue(indexJson)
7579
.retrieve()
7680
.toBodilessEntity()
7781
.doOnSuccess { logger.info("Index created successfully") }
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package ch.srgssr.pillarbox.monitoring.event.setup
2+
3+
import ch.srgssr.pillarbox.monitoring.io.loadResourceContent
4+
import ch.srgssr.pillarbox.monitoring.log.error
5+
import ch.srgssr.pillarbox.monitoring.log.logger
6+
import org.springframework.beans.factory.annotation.Qualifier
7+
import org.springframework.core.annotation.Order
8+
import org.springframework.core.io.ResourceLoader
9+
import org.springframework.http.HttpHeaders
10+
import org.springframework.http.HttpStatusCode
11+
import org.springframework.http.MediaType
12+
import org.springframework.stereotype.Component
13+
import org.springframework.web.reactive.function.client.WebClient
14+
import reactor.core.publisher.Mono
15+
16+
/**
17+
* Task responsible for setting up the OpenSearch index template.
18+
*
19+
* This task creates the index template stored in `resources/opensearch/index_template.json`.
20+
*
21+
* @property webClient WebClient instance used to interact with the OpenSearch API.
22+
* @property resourceLoader Resource loader used to access the index template JSON file.
23+
*/
24+
@Component
25+
@Order(2)
26+
class IndexTemplateSetupTask(
27+
@Qualifier("openSearchWebClient")
28+
private val webClient: WebClient,
29+
private val resourceLoader: ResourceLoader,
30+
) : OpenSearchSetupTask {
31+
private companion object {
32+
/**
33+
* Logger instance for logging within this task.
34+
*/
35+
private val logger = logger()
36+
37+
/**
38+
* Path for creating the index template.
39+
*/
40+
private const val INDEX_TEMPLATE_CREATION_PATH = "/_index_template/events_template"
41+
}
42+
43+
/**
44+
* Runs the index template setup task.
45+
*
46+
* Checks if the index template exists: if not, creates the index template.
47+
*
48+
* @return Mono indicating the completion of the task.
49+
*/
50+
override fun run(): Mono<*> = checkAndCreateTemplate()
51+
52+
private fun checkAndCreateTemplate(): Mono<*> =
53+
webClient
54+
.get()
55+
.uri(INDEX_TEMPLATE_CREATION_PATH)
56+
.retrieve()
57+
.onStatus(HttpStatusCode::is4xxClientError) {
58+
logger.info("Index template does not exist, creating it...")
59+
createTemplate().then(Mono.empty())
60+
}.onStatus(HttpStatusCode::is2xxSuccessful) {
61+
logger.info("Index template already exists, skipping creation.")
62+
Mono.empty()
63+
}.toBodilessEntity()
64+
65+
private fun createTemplate(): Mono<*> {
66+
val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/index_template.json")
67+
return webClient
68+
.put()
69+
.uri(INDEX_TEMPLATE_CREATION_PATH)
70+
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
71+
.bodyValue(indexTemplateJson)
72+
.retrieve()
73+
.toBodilessEntity()
74+
.doOnSuccess { logger.info("Index template created successfully") }
75+
.doOnError { e -> logger.error { "Failed to create index template: ${e.message}" } }
76+
}
77+
}

src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ch.srgssr.pillarbox.monitoring.event.setup
33
import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties
44
import ch.srgssr.pillarbox.monitoring.log.logger
55
import org.springframework.beans.factory.annotation.Qualifier
6-
import org.springframework.http.ResponseEntity
76
import org.springframework.stereotype.Service
87
import org.springframework.web.reactive.function.client.WebClient
98
import reactor.core.publisher.Flux
@@ -52,7 +51,7 @@ class OpenSearchSetupService(
5251
.then(runSetupTasks())
5352
.doOnSuccess { logger.info("All setup tasks are completed, starting SSE client...") }
5453

55-
private fun checkOpenSearchHealth(): Mono<ResponseEntity<Void>> =
54+
private fun checkOpenSearchHealth(): Mono<*> =
5655
webClient
5756
.get()
5857
.uri("/")

src/main/resources/opensearch/alias.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"actions": [
33
{
44
"add": {
5-
"index": "actions-*",
6-
"alias": "filtered_actions",
5+
"index": "events*",
6+
"alias": "user_events",
77
"filter": {
88
"bool": {
99
"must_not": [
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"aliases": {
3+
"events": {
4+
"is_write_index": true
5+
}
6+
}
7+
}
Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
{
2-
"settings": {
3-
"number_of_shards": 1
4-
},
5-
"mappings": {
6-
"properties": {
7-
"@timestamp": {
8-
"type": "date",
9-
"format": "epoch_millis"
2+
"index_patterns": ["events*"],
3+
"priority": 100,
4+
"template": {
5+
"settings": {
6+
"number_of_shards": 3,
7+
"number_of_replicas": 1,
8+
"plugins.index_state_management.rollover_alias": "events"
9+
},
10+
"mappings": {
11+
"properties": {
12+
"@timestamp": {
13+
"type": "date",
14+
"format": "epoch_millis"
15+
}
1016
}
1117
}
12-
},
13-
"aliases": {
14-
"actions": {
15-
"is_write_index": true
16-
}
1718
}
1819
}

src/main/resources/opensearch/ism_policy.json

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,34 @@
11
{
22
"policy": {
3-
"description": "Policy for managing actions indices",
3+
"description": "Hot warm delete policy for events",
44
"default_state": "hot",
5+
"schema_version": 1,
56
"states": [
67
{
78
"name": "hot",
89
"actions": [
910
{
1011
"rollover": {
11-
"min_index_age": "14d"
12+
"min_index_age": "1d",
13+
"copy_alias": true
14+
}
15+
}
16+
],
17+
"transitions": [
18+
{
19+
"state_name": "warm",
20+
"conditions": {
21+
"min_index_age": "7d"
22+
}
23+
}
24+
]
25+
},
26+
{
27+
"name": "warm",
28+
"actions": [
29+
{
30+
"replica_count": {
31+
"number_of_replicas": 0
1232
}
1333
}
1434
],
@@ -27,14 +47,12 @@
2747
{
2848
"delete": {}
2949
}
30-
],
31-
"transitions": []
50+
]
3251
}
52+
3353
],
3454
"ism_template": {
35-
"index_patterns": [
36-
"actions*"
37-
],
55+
"index_patterns": ["events*"],
3856
"priority": 100
3957
}
4058
}

0 commit comments

Comments
 (0)