Skip to content

Commit

Permalink
Merge pull request #270 from emeraldpay/feat/invalidate-on-429
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored Oct 14, 2023
2 parents 15c8209 + 58fe524 commit 4478e06
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 191 deletions.
9 changes: 7 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/quorum/CallQuorum.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
interface CallQuorum {

companion object {
fun isConnectionUnavailable(error: JsonRpcException): Boolean {

fun isConnectionUnavailable(statusCode: Int): Boolean {
//
// The problem is that some servers respond with 4xx/5xx in normal cases telling that the input data
// cannot be processed (ex. transaction is known already), in addition to the error message in the JSON RPC body.
Expand All @@ -44,7 +45,11 @@ interface CallQuorum {
//
// See https://github.com/emeraldpay/dshackle/issues/251 also
//
return error.statusCode != null && (error.statusCode == 429 || error.statusCode == 401 || error.statusCode in 502..504)
return statusCode == 429 || statusCode == 401 || statusCode in 502..504
}

fun isConnectionUnavailable(error: JsonRpcException): Boolean {
return error.statusCode != null && isConnectionUnavailable(error.statusCode)
}
}

Expand Down
26 changes: 26 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.api.Chain
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import org.springframework.context.Lifecycle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer

abstract class DefaultUpstream(
private val id: String,
Expand Down Expand Up @@ -100,6 +104,25 @@ abstract class DefaultUpstream(
}
}

//
// Can be used to temporarily disable the upstream until the specified time.
// For example if it produces an error indicating there is too many requests.
private val temporaryDisable = AtomicReference<Instant?>(null)

val watchHttpCodes = Consumer<Int> { code ->
if (CallQuorum.isConnectionUnavailable(code)) {
val pause = Instant.now() + Duration.ofMinutes(1)
temporaryDisable.updateAndGet { prev ->
if (prev == null || prev < pause) {
pause
} else {
prev
}
}
statusStream.tryEmitNext(UpstreamAvailability.UNAVAILABLE)
}
}

override fun isAvailable(): Boolean {
return getStatus() == UpstreamAvailability.OK
}
Expand All @@ -113,6 +136,9 @@ abstract class DefaultUpstream(
}

override fun getStatus(): UpstreamAvailability {
if (temporaryDisable.get()?.isAfter(Instant.now()) == true) {
return UpstreamAvailability.UNAVAILABLE
}
if (forked.get()) {
return UpstreamAvailability.IMMATURE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods
import io.emeraldpay.dshackle.upstream.rpcclient.WithHttpStatus
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import reactor.core.Disposable
Expand Down Expand Up @@ -51,6 +52,12 @@ open class EthereumRpcUpstream(
private val head: Head = this.createHead()
private var validatorSubscription: Disposable? = null

init {
if (directReader is WithHttpStatus) {
directReader.onHttpError = this.watchHttpCodes
}
}

override fun setCaches(caches: Caches) {
if (head is CachesEnabled) {
head.setCaches(caches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription
import io.emeraldpay.dshackle.upstream.rpcclient.WithHttpStatus
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import reactor.core.Disposable
Expand Down Expand Up @@ -54,6 +55,9 @@ class EthereumWsUpstream(
val wsSubscriptions = WsSubscriptionsImpl(pool)
head = EthereumWsHead(chain, getIngressReader(), wsSubscriptions)
subscriptions = EthereumWsIngressSubscription(wsSubscriptions)
if (directReader is WithHttpStatus) {
directReader.onHttpError = this.watchHttpCodes
}
}

override fun getHead(): Head {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JsonRpcHttpClient(
private val metrics: RpcMetrics,
basicAuth: AuthConfig.ClientBasicAuth? = null,
tlsCAAuth: ByteArray? = null
) : StandardRpcReader {
) : StandardRpcReader, WithHttpStatus {

companion object {
private val log = LoggerFactory.getLogger(JsonRpcHttpClient::class.java)
Expand All @@ -56,6 +56,8 @@ class JsonRpcHttpClient(
private val parser = ResponseRpcParser()
private val httpClient: HttpClient

override var onHttpError: Consumer<Int>? = null

init {
var build = HttpClient.create()
.resolver(DefaultAddressResolverGroup.INSTANCE)
Expand Down Expand Up @@ -131,6 +133,10 @@ class JsonRpcHttpClient(
resp.flatMap {
if (it.hasError()) {
val statusCode = if (it.httpCode == 200) null else it.httpCode
if (statusCode != null) {
println("Error code $statusCode")
onHttpError?.accept(statusCode)
}
Mono.error(JsonRpcException(it.id, it.error!!, statusCode))
} else {
Mono.just(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.rpcclient
import io.emeraldpay.dshackle.reader.StandardRpcReader
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import java.util.function.Consumer

/**
* An aggregating JSON RPC Client that wraps two actual readers, a Primary and a Secondary.
Expand All @@ -11,12 +12,27 @@ import reactor.core.publisher.Mono
class JsonRpcSwitchClient(
private val primary: StandardRpcReader,
private val secondary: StandardRpcReader,
) : StandardRpcReader {
) : StandardRpcReader, WithHttpStatus {

companion object {
private val log = LoggerFactory.getLogger(JsonRpcSwitchClient::class.java)
}

override var onHttpError: Consumer<Int>? = null

init {
if (primary is WithHttpStatus) {
primary.onHttpError = Consumer { code ->
onHttpError?.accept(code)
}
}
if (secondary is WithHttpStatus) {
secondary.onHttpError = Consumer { code ->
onHttpError?.accept(code)
}
}
}

override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
return primary.read(key)
.switchIfEmpty(Mono.error(IllegalStateException("No response from Primary Connection")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.emeraldpay.dshackle.upstream.rpcclient

import java.util.function.Consumer

/**
* Interface for RPC clients that can notify about HTTP status codes
*/
interface WithHttpStatus {

/**
* Called for each HTTP error code (non-200)
*/
var onHttpError: Consumer<Int>?
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.test
package io.emeraldpay.dshackle.testgroovy

import io.emeraldpay.dshackle.upstream.ForkWatch
import io.emeraldpay.dshackle.upstream.NeverForkChoice
Expand Down

This file was deleted.

Loading

0 comments on commit 4478e06

Please sign in to comment.