Skip to content

Commit

Permalink
xds-failover: plumbing the API and adding integration test (#34761)
Browse files Browse the repository at this point in the history
Adding the ability to use xDS-Failover into the API (ADS config).

Introducing the xds-failover integration tests.
This is all guarded behind the envoy.restart_features.xds_failover_support guard.
In the future, once we test xDS-Failover out and ensure it works as expected, the runtime guard will be removed.

Risk Level: low if not configured, medium if it is.
Testing: Added unit tests and integration tests
Docs Changes: N/A (once xDS-failover behavior will be finalized).
Release Notes: N/A (once xDS-failover behavior will be finalized).
Platform Specific Features: N/A.

Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa authored Jun 28, 2024
1 parent 9ce333b commit 3feff04
Show file tree
Hide file tree
Showing 16 changed files with 929 additions and 73 deletions.
59 changes: 44 additions & 15 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "source/common/common/assert.h"
#include "source/common/protobuf/utility.h"

#include "absl/status/status.h"

namespace Envoy {
namespace Config {

Expand Down Expand Up @@ -68,11 +70,13 @@ namespace {
/**
* Check the grpc_services and cluster_names for API config sanity. Throws on error.
* @param api_config_source the config source to validate.
* @param max_grpc_services the maximal number of grpc services allowed.
* @return an invalid status when an API config has the wrong number of gRPC
* services or cluster names, depending on expectations set by its API type.
*/
absl::Status
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source,
int max_grpc_services) {
const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC ||
api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC);
Expand All @@ -89,10 +93,10 @@ checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_co
fmt::format("{}::(DELTA_)GRPC must not have a cluster name specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
}
if (api_config_source.grpc_services().size() > 1) {
return absl::InvalidArgumentError(
fmt::format("{}::(DELTA_)GRPC must have a single gRPC service specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
if (api_config_source.grpc_services_size() > max_grpc_services) {
return absl::InvalidArgumentError(fmt::format(
"{}::(DELTA_)GRPC must have no more than {} gRPC services specified: {}",
api_config_source.GetTypeName(), max_grpc_services, api_config_source.DebugString()));
}
} else {
if (!api_config_source.grpc_services().empty()) {
Expand Down Expand Up @@ -133,7 +137,9 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC) {
return absl::OkStatus();
}
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));

const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC);
Expand All @@ -153,6 +159,14 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
primary_clusters, api_config_source.grpc_services()[0].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") &&
api_config_source.grpc_services_size() == 2 &&
api_config_source.grpc_services()[1].has_envoy_grpc()) {
// If an Envoy failover gRPC exists, we validate its cluster name.
RETURN_IF_NOT_OK(Utility::validateClusterName(
primary_clusters, api_config_source.grpc_services()[1].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
}
// Otherwise, there is no cluster name to validate.
return absl::OkStatus();
Expand All @@ -161,15 +175,23 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
absl::optional<std::string>
Utility::getGrpcControlPlane(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
if (api_config_source.grpc_services_size() > 0) {
// Only checking for the first entry in grpc_services, because Envoy's xDS implementation
// currently only considers the first gRPC endpoint and ignores any other xDS management servers
// specified in an ApiConfigSource.
std::string res = "";
// In case more than one grpc service is defined, concatenate the names for
// a unique GrpcControlPlane identifier.
if (api_config_source.grpc_services(0).has_envoy_grpc()) {
return api_config_source.grpc_services(0).envoy_grpc().cluster_name();
res = api_config_source.grpc_services(0).envoy_grpc().cluster_name();
} else if (api_config_source.grpc_services(0).has_google_grpc()) {
res = api_config_source.grpc_services(0).google_grpc().target_uri();
}
if (api_config_source.grpc_services(0).has_google_grpc()) {
return api_config_source.grpc_services(0).google_grpc().target_uri();
// Concatenate the failover gRPC service.
if (api_config_source.grpc_services_size() == 2) {
if (api_config_source.grpc_services(1).has_envoy_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).envoy_grpc().cluster_name());
} else if (api_config_source.grpc_services(1).has_google_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).google_grpc().target_uri());
}
}
return res;
}
return absl::nullopt;
}
Expand Down Expand Up @@ -204,8 +226,10 @@ Utility::parseRateLimitSettings(const envoy::config::core::v3::ApiConfigSource&
absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSource(
Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source, Stats::Scope& scope,
bool skip_cluster_check) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
bool skip_cluster_check, int grpc_service_idx) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));

if (api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::GRPC &&
api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
Expand All @@ -214,8 +238,13 @@ absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSour
api_config_source.DebugString()));
}

if (grpc_service_idx >= api_config_source.grpc_services_size()) {
// No returned factory in case there's no entry.
return nullptr;
}

envoy::config::core::v3::GrpcService grpc_service;
grpc_service.MergeFrom(api_config_source.grpc_services(0));
grpc_service.MergeFrom(api_config_source.grpc_services(grpc_service_idx));

return async_client_manager.factoryForGrpcService(grpc_service, scope, skip_cluster_check);
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,15 @@ class Utility {
* @param async_client_manager gRPC async client manager.
* @param api_config_source envoy::config::core::v3::ApiConfigSource. Must have config type GRPC.
* @param skip_cluster_check whether to skip cluster validation.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory.
* @param grpc_service_idx index of the grpc service in the api_config_source. If there's no entry
* in the given index, a nullptr factory will be returned.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory, or nullptr if there's no
* grpc_service in the given index.
*/
static absl::StatusOr<Grpc::AsyncClientFactoryPtr>
factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source,
Stats::Scope& scope, bool skip_cluster_check);
Stats::Scope& scope, bool skip_cluster_check, int grpc_service_idx);

/**
* Translate opaque config from google.protobuf.Any to defined proto message.
Expand Down
61 changes: 43 additions & 18 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,25 @@ getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostCons

bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
absl::string_view cluster_name) {
bool blocking_ads_cluster = false;
if (bootstrap.dynamic_resources().has_ads_config()) {
const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
// We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
// initialization if it uses an Envoy cluster that needs to be initialized first. We don't
// depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
return (ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
blocking_ads_cluster =
(ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
// Validate the failover server if there is one.
blocking_ads_cluster |=
(ads_config_source.grpc_services_size() == 2 &&
ads_config_source.grpc_services(1).has_envoy_grpc() &&
ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name);
}
}
return false;
return blocking_ads_cluster;
}

} // namespace
Expand Down Expand Up @@ -447,14 +456,22 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
RETURN_IF_STATUS_NOT_OK(factory_or_error);
ads_mux_ = factory->create(factory_or_error.value()->createUncachedRawAsyncClient(), nullptr,
dispatcher_, random_, *stats_.rootScope(),
dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
RETURN_IF_STATUS_NOT_OK(factory_primary_or_error);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
RETURN_IF_STATUS_NOT_OK(factory_failover_or_error);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
} else {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
RETURN_IF_NOT_OK(status);
Expand All @@ -470,12 +487,20 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
RETURN_IF_STATUS_NOT_OK(factory_or_error);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
RETURN_IF_STATUS_NOT_OK(factory_primary_or_error);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
RETURN_IF_STATUS_NOT_OK(factory_failover_or_error);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_or_error.value()->createUncachedRawAsyncClient(), nullptr, dispatcher_, random_,
*stats_.rootScope(), dyn_resources.ads_config(), local_info_,
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
}
Expand Down Expand Up @@ -568,7 +593,7 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(
absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
RETURN_IF_NOT_OK(status);
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, load_stats_config, *stats_.rootScope(), false);
*async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0);
RETURN_IF_STATUS_NOT_OK(factory_or_error);
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_*/ nullptr,
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ template <class RequestType, class ResponseType>
class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
public Logger::Loggable<Logger::Id::config> {
public:
static constexpr uint32_t DefaultFailoverBackoffMilliseconds = 500;

// A GrpcStream creator function that receives the stream callbacks and returns a
// GrpcStream object. This is introduced to facilitate dependency injection for
// testing and will be used to create the primary and failover streams.
Expand Down
24 changes: 22 additions & 2 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,28 @@ GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) {
grpc_mux_context.rate_limit_settings_);
},
/*failover_stream_creator=*/
// TODO(adisuissa): implement when failover is fully plumbed.
absl::nullopt,
grpc_mux_context.failover_async_client_
? absl::make_optional(
[&grpc_mux_context](
GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>*
callbacks)
-> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse> {
return std::make_unique<
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
callbacks, std::move(grpc_mux_context.failover_async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_,
// TODO(adisuissa): the backoff strategy for the failover should
// be the same as the primary source.
std::make_unique<FixedBackOffStrategy>(
GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>::
DefaultFailoverBackoffMilliseconds),
grpc_mux_context.rate_limit_settings_);
})
: absl::nullopt,
/*grpc_mux_callbacks=*/*this,
/*dispatch=*/grpc_mux_context.dispatcher_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/sotwGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down Expand Up @@ -75,15 +75,15 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down
Loading

0 comments on commit 3feff04

Please sign in to comment.