Skip to content

Commit

Permalink
Limit on the number of HTTP requests processed from a connection in a…
Browse files Browse the repository at this point in the history
…n I/O cycle

Signed-off-by: Yan Avlasov <[email protected]>
  • Loading branch information
yanavlasov authored and pmerrison committed Oct 4, 2023
1 parent 9256f06 commit a4cdd07
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 15 deletions.
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ behavior_changes:
resets is applied. The connection is disconnected if more than 50% of resets are premature.
Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables
this check.
- area: http
change: |
Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed
from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This
mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other
connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections.
By default this limit is disabled.
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
93 changes: 88 additions & 5 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey
"overload.premature_reset_total_stream_count";
const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
"overload.premature_reset_min_stream_lifetime_seconds";
// Runtime key for maximum number of requests that can be processed from a single connection per
// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
"http.max_requests_per_io_cycle";

bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
if (!headers) {
Expand Down Expand Up @@ -113,7 +117,9 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName(
/*node_id=*/local_info_.node().id(),
/*server_name=*/config_.serverName(),
/*proxy_status_config=*/config_.proxyStatusConfig())) {}
/*proxy_status_config=*/config_.proxyStatusConfig())),
max_requests_during_dispatch_(runtime_.snapshot().getInteger(
ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)) {}

const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
Expand All @@ -123,6 +129,12 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {

void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_callbacks_ = &callbacks;
if (max_requests_during_dispatch_ != UINT32_MAX) {
deferred_request_processing_callback_ =
callbacks.connection().dispatcher().createSchedulableCallback(
[this]() -> void { onDeferredRequestProcessing(); });
}

stats_.named_.downstream_cx_total_.inc();
stats_.named_.downstream_cx_active_.inc();
if (read_callbacks_->connection().ssl()) {
Expand Down Expand Up @@ -439,6 +451,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
}

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
requests_during_dispatch_count_ = 0;
if (!codec_) {
// Http3 codec should have been instantiated by now.
createCodec(data);
Expand Down Expand Up @@ -1322,7 +1335,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
traceRequest();
}

filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
filter_manager_.decodeHeaders(*request_headers_, end_stream);
} else {
state_.deferred_to_next_io_iteration_ = true;
state_.deferred_end_stream_ = end_stream;
}

// Reset it here for both global and overridden cases.
resetIdleTimer();
Expand Down Expand Up @@ -1389,8 +1407,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo
connection_manager_.read_callbacks_->connection().dispatcher());
maybeEndDecode(end_stream);
filter_manager_.streamInfo().addBytesReceived(data.length());

filter_manager_.decodeData(data, end_stream);
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeData(data, end_stream);
} else {
if (!deferred_data_) {
deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
}
deferred_data_->move(data);
state_.deferred_end_stream_ = end_stream;
}
}

void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
Expand All @@ -1406,7 +1431,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
return;
}
maybeEndDecode(true);
filter_manager_.decodeTrailers(*request_trailers_);
if (!state_.deferred_to_next_io_iteration_) {
filter_manager_.decodeTrailers(*request_trailers_);
}
}

void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
Expand Down Expand Up @@ -2095,5 +2122,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a
connection_manager_.doEndStream(*this);
}

bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
// TODO(yanavlasov): Merge this with the filter manager continueIteration() method
if (!state_.deferred_to_next_io_iteration_) {
return false;
}
state_.deferred_to_next_io_iteration_ = false;
bool end_stream =
state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr;
filter_manager_.decodeHeaders(*request_headers_, end_stream);
if (end_stream) {
return true;
}
if (deferred_data_ != nullptr) {
end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
filter_manager_.decodeData(*deferred_data_, end_stream);
}
if (request_trailers_ != nullptr) {
filter_manager_.decodeTrailers(*request_trailers_);
}
return true;
}

bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
// Do not defer this stream if stream deferral is disabled
if (deferred_request_processing_callback_ == nullptr) {
return false;
}
// Defer this stream if there are already deferred streams, so they are not
// processed out of order
if (deferred_request_processing_callback_->enabled()) {
return true;
}
++requests_during_dispatch_count_;
bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
if (defer) {
deferred_request_processing_callback_->scheduleCallbackNextIteration();
}
return defer;
}

void ConnectionManagerImpl::onDeferredRequestProcessing() {
requests_during_dispatch_count_ = 1; // 1 stream is always let through
// Streams are inserted at the head of the list. As such process deferred
// streams at the back of the list first.
for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) {
auto& stream_ptr = *reverse_iter;
// Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the
// stream from the list.
++reverse_iter;
bool was_deferred = stream_ptr->onDeferredRequestProcessing();
if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
break;
}
}
}

} // namespace Http
} // namespace Envoy
25 changes: 24 additions & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// The minimum lifetime of a stream, in seconds, in order not to be considered
// prematurely closed.
static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
static const absl::string_view MaxRequestsPerIoCycle;

private:
struct ActiveStream;
Expand Down Expand Up @@ -352,7 +353,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
: codec_saw_local_complete_(false), codec_encode_complete_(false),
on_reset_stream_called_(false), is_zombie_stream_(false), saw_connection_close_(false),
successful_upgrade_(false), is_internally_destroyed_(false),
is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true) {}
is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true),
deferred_to_next_io_iteration_(false) {}

// It's possibly for the codec to see the completed response but not fully
// encode it.
Expand All @@ -378,6 +380,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
bool is_tunneling_ : 1;

bool decorated_propagate_ : 1;

// Indicates that sending headers to the filter manager is deferred to the
// next I/O cycle. If data or trailers are received when this flag is set
// they are deferred too.
// TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
// structure, so it can be atomically created and cleared.
bool deferred_to_next_io_iteration_ : 1;
bool deferred_end_stream_ : 1;
};

bool canDestroyStream() const {
Expand Down Expand Up @@ -425,6 +435,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// HTTP connection manager configuration, then the entire connection is closed.
bool validateTrailers();

// Dispatch deferred headers, body and trailers to the filter manager.
// Return true if this stream was deferred and dispatched pending headers, body and trailers (if
// present). Return false if this stream was not deferred.
bool onDeferredRequestProcessing();

ConnectionManagerImpl& connection_manager_;
OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_;
// TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in
Expand Down Expand Up @@ -511,6 +526,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const Tracing::CustomTagMap* customTags() const override;
bool verbose() const override;
uint32_t maxPathTagLength() const override;

std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
};

using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
Expand Down Expand Up @@ -566,6 +583,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// and at least half have been prematurely reset?
void maybeDrainDueToPrematureResets();

bool shouldDeferRequestProxyingToNextIoCycle();
void onDeferredRequestProcessing();

enum class DrainState { NotDraining, Draining, Closing };

ConnectionManagerConfig& config_;
Expand Down Expand Up @@ -610,6 +630,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// the definition given in `isPrematureRstStream()`.
uint64_t number_premature_stream_resets_{0};
const std::string proxy_name_; // for Proxy-Status.
uint32_t requests_during_dispatch_count_{0};
const uint32_t max_requests_during_dispatch_{UINT32_MAX};
Event::SchedulableCallbackPtr deferred_request_processing_callback_;
};

} // namespace Http
Expand Down
Loading

0 comments on commit a4cdd07

Please sign in to comment.