Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breakers #302

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions docs/configuration.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.google.protobuf.Value
import com.google.protobuf.util.Durations
import io.envoyproxy.controlplane.server.exception.RequestException
import io.grpc.Status
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.CircuitBreakerProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.EgressProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.util.StatusCodeFilterParser
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.util.StatusCodeFilterSettings
Expand Down Expand Up @@ -77,20 +79,7 @@ fun Value?.toOutgoing(properties: SnapshotProperties): Outgoing {
val allServiceDependenciesIdentifier = properties.outgoingPermissions.allServicesDependencies.identifier
val rawDependencies = this?.field("dependencies")?.list().orEmpty().map(::toRawDependency)
val allServicesDependencies = toAllServiceDependencies(rawDependencies, allServiceDependenciesIdentifier)
val defaultSettingsFromProperties = DependencySettings(
handleInternalRedirect = properties.egress.handleInternalRedirect,
timeoutPolicy = Outgoing.TimeoutPolicy(
idleTimeout = Durations.fromMillis(properties.egress.commonHttp.idleTimeout.toMillis()),
connectionIdleTimeout = Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis()),
requestTimeout = Durations.fromMillis(properties.egress.commonHttp.requestTimeout.toMillis())
),
retryPolicy = RetryPolicy(
numberRetries = properties.retryPolicy.numberOfRetries,
retryHostPredicate = properties.retryPolicy.retryHostPredicate,
hostSelectionRetryMaxAttempts = properties.retryPolicy.hostSelectionRetryMaxAttempts,
retryBackOff = properties.retryPolicy.retryBackOff
)
)
val defaultSettingsFromProperties = createDefaultOutgoingProperties(properties.egress)
val allServicesDefaultSettings = allServicesDependencies?.value.toSettings(defaultSettingsFromProperties)
val services = rawDependencies.filter { it.service != null && it.service != allServiceDependenciesIdentifier }
.map {
Expand All @@ -114,6 +103,47 @@ fun Value?.toOutgoing(properties: SnapshotProperties): Outgoing {
)
}

private fun createDefaultOutgoingProperties(egress: EgressProperties) : DependencySettings {
Ferdudas97 marked this conversation as resolved.
Show resolved Hide resolved
return DependencySettings(
handleInternalRedirect = egress.handleInternalRedirect,
timeoutPolicy = egress.commonHttp.let {
Outgoing.TimeoutPolicy(
idleTimeout = Durations.fromMillis(it.idleTimeout.toMillis()),
connectionIdleTimeout = Durations.fromMillis(it.connectionIdleTimeout.toMillis()),
requestTimeout = Durations.fromMillis(it.requestTimeout.toMillis())
)
},
retryPolicy = egress.retryPolicy.let { RetryPolicy(
numberRetries = it.numberOfRetries,
retryHostPredicate = it.retryHostPredicate,
hostSelectionRetryMaxAttempts = it.hostSelectionRetryMaxAttempts,
retryBackOff = it.retryBackOff
) },
circuitBreakers = egress.commonHttp.circuitBreakers.let { properties ->
CircuitBreakers(defaultThreshold = properties.defaultThreshold.toCircuitBreaker(),
highThreshold = properties.highThreshold.toCircuitBreaker())
}
)
}

fun CircuitBreakerProperties.toCircuitBreaker(): CircuitBreaker {
return CircuitBreaker(
priority = this.priority,
maxRequests = this.maxRequests,
maxPendingRequests = this.maxPendingRequests,
maxConnections = this.maxConnections,
maxRetries = this.maxRetries,
maxConnectionPools = this.maxConnectionPools,
trackRemaining = this.trackRemaining,
retryBudget = this.retryBudget?.let {
RetryBudget(
budgetPercent = it.budgetPercent,
minRetryConcurrency = it.minRetryConcurrency
)
}
)
}

@Suppress("ComplexCondition")
private fun toRawDependency(it: Value): RawDependency {
val service = it.field("service")?.stringValue
Expand Down Expand Up @@ -193,11 +223,13 @@ private fun Value?.toSettings(defaultSettings: DependencySettings): DependencySe
defaultSettings.retryPolicy
)
}
val circuitBreakers = this?.field("circuitBreakers")?.toCircuitBreakers(defaultSettings.circuitBreakers)

val shouldAllBeDefault = handleInternalRedirect == null &&
rewriteHostHeader == null &&
timeoutPolicy == null &&
retryPolicy == null
retryPolicy == null &&
circuitBreakers == null

return if (shouldAllBeDefault) {
defaultSettings
Expand All @@ -206,11 +238,44 @@ private fun Value?.toSettings(defaultSettings: DependencySettings): DependencySe
handleInternalRedirect = handleInternalRedirect ?: defaultSettings.handleInternalRedirect,
timeoutPolicy = timeoutPolicy ?: defaultSettings.timeoutPolicy,
rewriteHostHeader = rewriteHostHeader ?: defaultSettings.rewriteHostHeader,
retryPolicy = retryPolicy ?: defaultSettings.retryPolicy
retryPolicy = retryPolicy ?: defaultSettings.retryPolicy,
circuitBreakers = circuitBreakers ?: defaultSettings.circuitBreakers
)
}
}

private fun Value?.toCircuitBreakers(defaultCircuitBreakers: CircuitBreakers): CircuitBreakers {
return CircuitBreakers(
defaultThreshold = this?.field("defaultThreshold")?.toCircuitBreaker(defaultCircuitBreakers.defaultThreshold)
?: defaultCircuitBreakers.defaultThreshold,
highThreshold = this?.field("highThreshold")?.toCircuitBreaker(defaultCircuitBreakers.highThreshold)
?: defaultCircuitBreakers.highThreshold
)
}

private fun Value?.toCircuitBreaker(defaultCircuitBreaker: CircuitBreaker?): CircuitBreaker {
return CircuitBreaker(priority = this?.field("priority")?.stringValue?.let { RoutingPriority.fromString(it) }
?: defaultCircuitBreaker?.priority,
maxRequests = this?.field("maxRequests")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxRequests,
maxPendingRequests = this?.field("maxPendingRequests")?.numberValue?.toInt()
?: defaultCircuitBreaker?.maxPendingRequests,
maxConnections = this?.field("maxConnections")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxConnections,
maxRetries = this?.field("maxRetries")?.numberValue?.toInt() ?: defaultCircuitBreaker?.maxRetries,
maxConnectionPools = this?.field("maxConnectionPools")?.numberValue?.toInt()
?: defaultCircuitBreaker?.maxConnectionPools,
trackRemaining = this?.field("trackRemaining")?.boolValue ?: defaultCircuitBreaker?.trackRemaining,
retryBudget = this?.field("retryBudget")?.toRetryBudget(defaultCircuitBreaker?.retryBudget)
?: defaultCircuitBreaker?.retryBudget
)
}
private fun Value?.toRetryBudget(defaultRetryBudget: RetryBudget?): RetryBudget {
return RetryBudget(
budgetPercent = this?.field("budgetPercent")?.numberValue ?: defaultRetryBudget?.budgetPercent,
minRetryConcurrency = this?.field("minRetryConcurrency")?.numberValue?.toInt()
?: defaultRetryBudget?.minRetryConcurrency
)
}

private fun mapProtoToRetryPolicy(value: Value, defaultRetryPolicy: RetryPolicy): RetryPolicy {
return RetryPolicy(
retryOn = value.field("retryOn")?.listValue?.valuesList?.map { it.stringValue },
Expand Down Expand Up @@ -537,9 +602,42 @@ data class DependencySettings(
val handleInternalRedirect: Boolean = false,
val timeoutPolicy: Outgoing.TimeoutPolicy = Outgoing.TimeoutPolicy(),
val rewriteHostHeader: Boolean = false,
val retryPolicy: RetryPolicy = RetryPolicy()
val retryPolicy: RetryPolicy = RetryPolicy(),
val circuitBreakers: CircuitBreakers = CircuitBreakers()
)

data class CircuitBreakers(
val defaultThreshold: CircuitBreaker? = null,
val highThreshold: CircuitBreaker? = null
)

data class CircuitBreaker(
val priority: RoutingPriority? = null,
val maxRequests: Int? = null,
val maxPendingRequests: Int? = null,
val maxConnections: Int? = null,
val maxRetries: Int? = null,
val maxConnectionPools: Int? = null,
val trackRemaining: Boolean? = null,
val retryBudget: RetryBudget? = null
)

data class RetryBudget(val budgetPercent: Double? = null, val minRetryConcurrency: Int? = null)

enum class RoutingPriority {
DEFAULT, HIGH, UNRECOGNIZED;

companion object {
fun fromString(value: String): RoutingPriority {
return when (value.toUpperCase()) {
"DEFAULT" -> DEFAULT
"HIGH" -> HIGH
else -> UNRECOGNIZED
}
}
}
}

data class RetryPolicy(
val retryOn: List<String>? = null,
val hostSelectionRetryMaxAttempts: Long? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.OAuth
import pl.allegro.tech.servicemesh.envoycontrol.groups.PathMatchingType
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryBackOff
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate
import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPriority
import java.net.URI
import java.time.Duration

Expand All @@ -35,7 +36,6 @@ class SnapshotProperties {
var jwt = JwtFilterProperties()
var requireServiceName = false
var rateLimit = RateLimitProperties()
var retryPolicy = RetryPolicyProperties()
}

class MetricsProperties {
Expand Down Expand Up @@ -251,6 +251,7 @@ class EgressProperties {
var hostHeaderRewriting = HostHeaderRewritingProperties()
var headersToRemove = mutableListOf<String>()
var domains = mutableListOf<String>()
var retryPolicy = RetryPolicyProperties()
}

class IngressProperties {
Expand All @@ -265,19 +266,27 @@ class CommonHttpProperties {
var idleTimeout: Duration = Duration.ofSeconds(120)
var connectionIdleTimeout: Duration = Duration.ofSeconds(120)
var requestTimeout: Duration = Duration.ofSeconds(120)
var circuitBreakers: CircuitBreakers = CircuitBreakers()
var circuitBreakers: CircuitBreakersProperties = CircuitBreakersProperties()
}

class CircuitBreakersProperties {
var highThreshold = CircuitBreakerProperties(RoutingPriority.HIGH)
var defaultThreshold = CircuitBreakerProperties(RoutingPriority.DEFAULT)
}

class CircuitBreakers {
var highThreshold = Threshold("HIGH")
var defaultThreshold = Threshold("DEFAULT")
class CircuitBreakerProperties(var priority: RoutingPriority = RoutingPriority.DEFAULT) {
var maxRequests: Int = 1024
var maxPendingRequests: Int = 1024
var maxConnections: Int = 1024
var maxRetries: Int = 3
var maxConnectionPools: Int? = null
var trackRemaining: Boolean = false
var retryBudget: RetryBudgetProperties? = RetryBudgetProperties()
}

class Threshold(var priority: String) {
var maxConnections = 1024
var maxPendingRequests = 1024
var maxRequests = 1024
var maxRetries = 3
class RetryBudgetProperties {
malfoj89 marked this conversation as resolved.
Show resolved Hide resolved
var budgetPercent: Double = 20.0
var minRetryConcurrency: Int = 3
}

class Http2Properties {
Expand Down Expand Up @@ -365,5 +374,15 @@ data class RetryPolicyProperties(
)
)

// data class CircuitBreakerProperties(
Ferdudas97 marked this conversation as resolved.
Show resolved Hide resolved
// var priority: RoutingPriority = RoutingPriority.DEFAULT,
// var maxRequests: Int = 1024,
// var maxPendingRequest: Int = 1024,
// var maxConnections: Int = 1024,
// var maxRetries: Int = 3,
// var maxConnectionsPool: Int? = null,
// var trackRemaining: Boolean = false,
// var retryBudget: RetryBudget = RetryBudget(budgetPercent = 20.0, minRetryConcurrency = 3)
// )
typealias ProviderName = String
typealias TokenField = String
Loading