Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Support SNS as a destination #348

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestSearchEmai
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestSearchEmailGroupAction
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestSearchMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript
import com.amazon.opendistroforelasticsearch.alerting.settings.AWSSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettings
import com.amazon.opendistroforelasticsearch.alerting.transport.TransportAcknowledgeAlertAction
Expand Down Expand Up @@ -255,6 +256,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
AWSSettings.SNS_IAM_USER_ACCESS_KEY,
AWSSettings.SNS_IAM_USER_SECRET_KEY,
DestinationSettings.HOST_DENY_LIST
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import com.amazon.opendistroforelasticsearch.alerting.model.action.Action.Compan
import com.amazon.opendistroforelasticsearch.alerting.model.destination.DestinationContextFactory
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerExecutionContext
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript
import com.amazon.opendistroforelasticsearch.alerting.settings.AWSSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
Expand Down Expand Up @@ -126,6 +127,7 @@ class MonitorRunner(

@Volatile private var destinationSettings = loadDestinationSettings(settings)
@Volatile private var destinationContextFactory = DestinationContextFactory(client, xContentRegistry, destinationSettings)
@Volatile private var awsSettings = AWSSettings.parse(settings)
lezzago marked this conversation as resolved.
Show resolved Hide resolved

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) {
Expand All @@ -143,6 +145,9 @@ class MonitorRunner(
fun reloadDestinationSettings(settings: Settings) {
destinationSettings = loadDestinationSettings(settings)

// Update awsSettings for SNS destination type
awsSettings = AWSSettings.parse(settings)

// Update destinationContextFactory as well since destinationSettings has been updated
destinationContextFactory.updateDestinationSettings(destinationSettings)
}
Expand Down Expand Up @@ -538,6 +543,7 @@ class MonitorRunner(

val destinationCtx = destinationContextFactory.getDestinationContext(destination)
actionOutput[MESSAGE_ID] = destination.publish(
awsSettings,
actionOutput[SUBJECT],
actionOutput[MESSAGE]!!,
destinationCtx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import com.amazon.opendistroforelasticsearch.alerting.destination.message.BaseMe
import com.amazon.opendistroforelasticsearch.alerting.destination.message.ChimeMessage
import com.amazon.opendistroforelasticsearch.alerting.destination.message.CustomWebhookMessage
import com.amazon.opendistroforelasticsearch.alerting.destination.message.EmailMessage
import com.amazon.opendistroforelasticsearch.alerting.destination.message.SNSMessage
import com.amazon.opendistroforelasticsearch.alerting.destination.message.SlackMessage
import com.amazon.opendistroforelasticsearch.alerting.destination.response.DestinationResponse
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalUserField
import com.amazon.opendistroforelasticsearch.alerting.model.destination.email.Email
import com.amazon.opendistroforelasticsearch.alerting.settings.AWSSettings
import com.amazon.opendistroforelasticsearch.alerting.util.DestinationType
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import com.amazon.opendistroforelasticsearch.alerting.util.isHostInDenylist
Expand Down Expand Up @@ -57,6 +59,7 @@ data class Destination(
val lastUpdateTime: Instant,
val chime: Chime?,
val slack: Slack?,
val sns: SNS?,
val customWebhook: CustomWebhook?,
val email: Email?
) : ToXContent {
Expand Down Expand Up @@ -97,6 +100,8 @@ data class Destination(
chime?.writeTo(out)
out.writeBoolean(slack != null)
slack?.writeTo(out)
out.writeBoolean(sns != null)
sns?.writeTo(out)
out.writeBoolean(customWebhook != null)
customWebhook?.writeTo(out)
out.writeBoolean(email != null)
Expand All @@ -119,6 +124,7 @@ data class Destination(
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val CHIME = "chime"
const val SLACK = "slack"
const val SNS_TYPE = "sns"
const val CUSTOMWEBHOOK = "custom_webhook"
const val EMAIL = "email"

Expand All @@ -142,6 +148,7 @@ data class Destination(
var user: User? = null
lateinit var type: String
var slack: Slack? = null
var sns: SNS? = null
var chime: Chime? = null
var customWebhook: CustomWebhook? = null
var email: Email? = null
Expand Down Expand Up @@ -170,6 +177,9 @@ data class Destination(
SLACK -> {
slack = Slack.parse(xcp)
}
SNS_TYPE -> {
sns = SNS.parse(xcp)
}
CUSTOMWEBHOOK -> {
customWebhook = CustomWebhook.parse(xcp)
}
Expand Down Expand Up @@ -198,6 +208,7 @@ data class Destination(
lastUpdateTime ?: Instant.now(),
chime,
slack,
sns,
customWebhook,
email)
}
Expand Down Expand Up @@ -230,6 +241,7 @@ data class Destination(
lastUpdateTime = sin.readInstant(),
chime = Chime.readFrom(sin),
slack = Slack.readFrom(sin),
sns = SNS.readFrom(sin),
customWebhook = CustomWebhook.readFrom(sin),
email = Email.readFrom(sin)
)
Expand All @@ -238,12 +250,12 @@ data class Destination(

@Throws(IOException::class)
fun publish(
awsSettings: AWSSettings,
compiledSubject: String?,
compiledMessage: String,
destinationCtx: DestinationContext,
denyHostRanges: List<String>
): String {

val destinationMessage: BaseMessage
val responseContent: String
val responseStatusCode: Int
Expand All @@ -262,6 +274,16 @@ data class Destination(
.withMessage(messageContent)
.build()
}
DestinationType.SNS -> {
destinationMessage = SNSMessage.Builder(name)
.withRoleArn(sns?.roleARN)
.withTopicArn(sns?.topicARN)
.withIAMAccessKey(awsSettings.iamUserAccessKey)
.withIAMSecretKey(awsSettings.iamUserSecretKey)
.withSubject(compiledSubject)
.withMessage(compiledMessage)
.build()
}
DestinationType.CUSTOM_WEBHOOK -> {
destinationMessage = CustomWebhookMessage.Builder(name)
.withUrl(customWebhook?.url)
Expand Down Expand Up @@ -292,7 +314,9 @@ data class Destination(
}
}

validateDestinationUri(destinationMessage, denyHostRanges)
if (type !== DestinationType.SNS) {
validateDestinationUri(destinationMessage, denyHostRanges)
}
val response = Notification.publish(destinationMessage) as DestinationResponse
responseContent = response.responseContent
responseStatusCode = response.statusCode
Expand All @@ -306,6 +330,7 @@ data class Destination(
when (type) {
DestinationType.CHIME -> content = chime?.convertToMap()?.get(type.value)
DestinationType.SLACK -> content = slack?.convertToMap()?.get(type.value)
DestinationType.SNS -> content = sns?.convertToMap()?.get(type.value)
DestinationType.CUSTOM_WEBHOOK -> content = customWebhook?.convertToMap()?.get(type.value)
DestinationType.EMAIL -> content = email?.convertToMap()?.get(type.value)
DestinationType.TEST_ACTION -> content = "dummy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package com.amazon.opendistroforelasticsearch.alerting.model.destination

import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalStringField
import com.amazon.opendistroforelasticsearch.alerting.util.DestinationType
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
Expand All @@ -23,24 +27,32 @@ import java.io.IOException
import java.lang.IllegalStateException
import java.util.regex.Pattern

data class SNS(val topicARN: String, val roleARN: String) : ToXContent {
data class SNS(val topicARN: String, val roleARN: String?) : ToXContent {

init {
require(SNS_ARN_REGEX.matcher(topicARN).find()) { "Invalid AWS SNS topic ARN: $topicARN" }
require(IAM_ARN_REGEX.matcher(roleARN).find()) { "Invalid AWS role ARN: $roleARN " }
if (roleARN != null) {
require(IAM_ARN_REGEX.matcher(roleARN).find()) { "Invalid AWS role ARN: $roleARN " }
}
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject(SNS_TYPE)
.field(TOPIC_ARN_FIELD, topicARN)
.field(ROLE_ARN_FIELD, roleARN)
.optionalStringField(ROLE_ARN_FIELD, roleARN)
.endObject()
}

@Throws(IOException::class)
fun writeTo(out: StreamOutput) {
out.writeString(topicARN)
out.writeOptionalString(roleARN)
}

companion object {

private val SNS_ARN_REGEX = Pattern.compile("^arn:aws(-[^:]+)?:sns:([a-zA-Z0-9-]+):([0-9]{12}):([a-zA-Z0-9-_]+)$")
private val IAM_ARN_REGEX = Pattern.compile("^arn:aws(-[^:]+)?:iam::([0-9]{12}):([a-zA-Z0-9-/_]+)$")
private val IAM_ARN_REGEX = Pattern.compile("^arn:aws(-[^:]+)?:iam::([0-9]{12}):([a-zA-Z_0-9+=,.@\\-_/]+)$")

const val TOPIC_ARN_FIELD = "topic_arn"
const val ROLE_ARN_FIELD = "role_arn"
Expand All @@ -50,7 +62,7 @@ data class SNS(val topicARN: String, val roleARN: String) : ToXContent {
@Throws(IOException::class)
fun parse(xcp: XContentParser): SNS {
lateinit var topicARN: String
lateinit var roleARN: String
var roleARN: String? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -64,8 +76,21 @@ data class SNS(val topicARN: String, val roleARN: String) : ToXContent {
}
}
}
return SNS(requireNotNull(topicARN) { "SNS Action topic_arn is null" },
requireNotNull(roleARN) { "SNS Action role_arn is null" })
if (DestinationType.snsUseIamRole) {
requireNotNull(roleARN) { "SNS Action role_arn is null" }
}
return SNS(requireNotNull(topicARN) { "SNS Action topic_arn is null" }, roleARN)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): SNS? {
return if (sin.readBoolean()) {
SNS(
topicARN = sin.readString(),
roleARN = sin.readOptionalString()
)
} else null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.alerting.settings

import com.amazon.opendistroforelasticsearch.alerting.util.DestinationType
import org.elasticsearch.common.settings.SecureSetting
import org.elasticsearch.common.settings.SecureString
import org.elasticsearch.common.settings.Settings
import java.io.IOException

/**
* Settings specific to AWS resources. This class is separated from the other settings classes to store anything specific to
* AWS resources.
*/
data class AWSSettings(
val iamUserAccessKey: SecureString,
val iamUserSecretKey: SecureString
) {
companion object {
val SNS_IAM_USER_ACCESS_KEY = SecureSetting.secureString(
"opendistro.alerting.destination.sns.access.key",
null
)

val SNS_IAM_USER_SECRET_KEY = SecureSetting.secureString(
"opendistro.alerting.destination.sns.secret.key",
null
)

@JvmStatic
@Throws(IOException::class)
fun parse(settings: Settings): AWSSettings {
DestinationType.snsUseIamRole = SNS_IAM_USER_ACCESS_KEY.get(settings) == null
return AWSSettings(
SNS_IAM_USER_ACCESS_KEY.get(settings),
SNS_IAM_USER_SECRET_KEY.get(settings)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ package com.amazon.opendistroforelasticsearch.alerting.util
enum class DestinationType(val value: String) {
CHIME("chime"),
SLACK("slack"),
SNS("sns"),
CUSTOM_WEBHOOK("custom_webhook"),
EMAIL("email"),
TEST_ACTION("test_action");

override fun toString(): String {
return value
}

companion object {
var snsUseIamRole = false
Copy link
Contributor

@qreshi qreshi Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little strange to me. I'm not sure if we should maintain state on using IAM role in the DestinationType enum class.

It looks like this is mostly used for null assertion in the SNS data class. Perhaps you can remove this variable and instead validate if either roleARN or access/secret is set somewhere. It looks like you do something like that in SNSMessage already, that might be sufficient.

And ideally, we should allow the access/secret settings to be changed using the reload API and if we support that then you can have scenarios where:

  1. access/secret settings are originally set
  2. SNS Destinations are created (allowing roleARN to be null since snsUseIamRole = false)
  3. The access/secret settings are removed, checking roleARN on any subsequent SNS Destination but now you have one that's already created with a null roleARN and no access/secret settings so it bypassed this check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this in case the customer uses the indexDestination API to create the SNS destination and we need some validation for the role arn.
Also I have updated the code such that snsUseIamRole can change whenever the settings are reloaded.

}
}
24 changes: 24 additions & 0 deletions alerting/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,28 @@ grant {

permission java.net.SocketPermission "*", "connect,resolve";
permission java.net.NetPermission "getProxySelector";

// needed because of problems in ClientConfiguration
// TODO: get these fixed in aws sdk
permission java.lang.RuntimePermission "accessDeclaredMembers";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need accessDeclaredMembers? Could you please double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required. The AWS SDK uses jackson databind, which needs accessDeclaredMembers.

permission java.lang.RuntimePermission "getClassLoader";
permission java.net.SocketPermission "*", "connect";
// Needed because of problems in AmazonSNS:
// When no region is set on a STSClient instance, the
// AWS SDK loads all known partitions from a JSON file and
// uses a Jackson's ObjectMapper for that: this one, in
// version 2.5.3 with the default binding options, tries
// to suppress access checks of ctor/field/method and thus
// requires this special permission. AWS must be fixed to
// uses Jackson correctly and have the correct modifiers
// on binded classes.
// TODO: get these fixed in aws sdk
// See https://github.com/aws/aws-sdk-java/issues/766
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";

// Below is specific for notification SNS client
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
lastUpdateTime = Instant.now(),
chime = null,
slack = null,
sns = null,
customWebhook = null,
email = null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
lastUpdateTime = Instant.now(),
chime = null,
slack = null,
sns = null,
customWebhook = null,
email = email
))
Expand All @@ -664,6 +665,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
lastUpdateTime = Instant.now(),
chime = null,
slack = null,
sns = null,
customWebhook = customWebhook,
email = null
))
Expand All @@ -690,6 +692,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
lastUpdateTime = Instant.now(),
chime = null,
slack = null,
sns = null,
customWebhook = customWebhook,
email = null
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class GetDestinationsResponseTests : ESTestCase() {
null,
slack,
null,
null,
null)

val req = GetDestinationsResponse(RestStatus.OK, 1, listOf(destination))
Expand Down
Loading