Skip to content

Commit

Permalink
refactor(stats): re-implement RotatingScope with thread local storage
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjian-pg committed May 23, 2024
1 parent e13f653 commit e35dd28
Showing 1 changed file with 50 additions and 27 deletions.
77 changes: 50 additions & 27 deletions source/extensions/filters/http/istio_stats/istio_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "envoy/registry/registry.h"
#include "envoy/server/factory_context.h"
#include "envoy/singleton/manager.h"
#include "envoy/thread_local/thread_local.h"
#include "extensions/common/metadata_object.h"
#include "parser/parser.h"
#include "source/common/grpc/common.h"
Expand Down Expand Up @@ -313,7 +314,7 @@ struct Context : public Singleton::Instance {

using ContextSharedPtr = std::shared_ptr<Context>;

SINGLETON_MANAGER_REGISTRATION(Context)
SINGLETON_MANAGER_REGISTRATION(istio_stats_filter_context)

using google::api::expr::runtime::CelValue;

Expand Down Expand Up @@ -418,21 +419,21 @@ struct MetricOverrides : public Logger::Loggable<Logger::Id::filter> {
// periodically to replace the current scope.
//
// The replaced stats scope is deleted gracefully after a minimum of 1s delay
// for two reasons:
//
// 1. Stats flushing is asynchronous and the data may be lost if not flushed
// before the deletion (see stats_flush_interval).
//
// 2. The implementation avoids locking by releasing a raw pointer to workers.
// When the rotation happens on the main, the raw pointer may still be in-use
// by workers for a short duration.
// because of stats flushing is asynchronous and the data may be lost if not
// flushed before the deletion (see stats_flush_interval).
class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
public:
RotatingScope(Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms,
uint64_t delete_interval_ms)
: parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope("")),
raw_scope_(active_scope_.get()), rotate_interval_ms_(rotate_interval_ms),
tls_scope_(factory_context.serverFactoryContext().threadLocal()),
rotate_interval_ms_(rotate_interval_ms),
delete_interval_ms_(delete_interval_ms) {

tls_scope_.set([&scope = *active_scope_](Event::Dispatcher&){
return std::make_shared<TlsCachedScope>(scope);
});

if (rotate_interval_ms_ > 0) {
ASSERT(delete_interval_ms_ < rotate_interval_ms_);
ASSERT(delete_interval_ms_ >= 1000);
Expand All @@ -452,36 +453,58 @@ class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
delete_timer_.reset();
}
}
Stats::Scope* scope() { return raw_scope_.load(); }
Stats::Scope& scope() { return tls_scope_->_scope; }

private:
struct TlsCachedScope:ThreadLocal::ThreadLocalObject{
TlsCachedScope(Stats::Scope& scope):_scope(scope) {};
std::reference_wrapper<Stats::Scope> _scope;
};

void onRotate() {
ENVOY_LOG(info, "Rotating active Istio stats scope after {}ms.", rotate_interval_ms_);
draining_scope_ = active_scope_;
delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_));
active_scope_ = parent_scope_.createScope("");
raw_scope_.store(active_scope_.get());
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
tls_scope_.runOnAllThreads(
[&scope = *active_scope_](OptRef<TlsCachedScope> tls_cache) {
tls_cache->_scope = scope;
},
// Start the delete and rotate timer after the new scope has been propagated to all worker threads.
// The RotatingScope instance can go away before the dispatcher has a chance to execute the callback
// and the still_alive shared_ptr will be deallocated when the current instance is deallocated.
// We rely on a weak_ptr to still_alive flag to determine if the instance is still valid.
[this, maybe_still_alive = std::weak_ptr<bool>(still_alive_)]() -> void {
if(!maybe_still_alive.expired()){
delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_));
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
}
});
}
void onDelete() {
ENVOY_LOG(info, "Deleting draining Istio stats scope after {}ms.", delete_interval_ms_);
draining_scope_.reset();
}

Stats::Scope& parent_scope_;
Stats::ScopeSharedPtr active_scope_;
std::atomic<Stats::Scope*> raw_scope_;
Stats::ScopeSharedPtr draining_scope_{nullptr};
ThreadLocal::TypedSlot<TlsCachedScope> tls_scope_;
const uint64_t rotate_interval_ms_;
const uint64_t delete_interval_ms_;
Event::TimerPtr rotate_timer_{nullptr};
Event::TimerPtr delete_timer_{nullptr};

// A sentinel shared_ptr used for keeping track of whether the RotatingContext is still alive.
// It is only held by a weak reference in the callback that will be invoked after the new active
// scope has been propagated to all worker threads.
std::shared_ptr<bool> still_alive_{std::make_shared<bool>(true)};
};

struct Config : public Logger::Loggable<Logger::Id::filter> {
Config(const stats::PluginConfig& proto_config,
Server::Configuration::FactoryContext& factory_context)
: context_(factory_context.serverFactoryContext().singletonManager().getTyped<Context>(
SINGLETON_MANAGER_REGISTERED_NAME(Context),
SINGLETON_MANAGER_REGISTERED_NAME(istio_stats_filter_context),
[&factory_context] {
return std::make_shared<Context>(
factory_context.serverFactoryContext().scope().symbolTable(),
Expand Down Expand Up @@ -514,7 +537,7 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
break;
}
if (proto_config.metrics_size() > 0 || proto_config.definitions_size() > 0) {
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope()->symbolTable());
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope().symbolTable());
for (const auto& definition : proto_config.definitions()) {
const auto& it = context_->all_metrics_.find(definition.name());
if (it != context_->all_metrics_.end()) {
Expand Down Expand Up @@ -698,12 +721,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
return;
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::counterFromStatNames(*parent_.scope(),
Stats::Utility::counterFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric}, new_tags)
.add(amount);
return;
}
Stats::Utility::counterFromStatNames(*parent_.scope(),
Stats::Utility::counterFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric}, tags)
.add(amount);
}
Expand All @@ -717,12 +740,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags)
.recordValue(value);
return;
}
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags)
.recordValue(value);
}

Expand All @@ -735,17 +758,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
switch (metric.type_) {
case MetricOverrides::MetricType::Counter:
Stats::Utility::counterFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags)
parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags)
.add(amount);
break;
case MetricOverrides::MetricType::Histogram:
Stats::Utility::histogramFromStatNames(
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_},
parent_.scope(), {parent_.context_->stat_namespace_, metric.name_},
Stats::Histogram::Unit::Bytes, tags)
.recordValue(amount);
break;
case MetricOverrides::MetricType::Gauge:
Stats::Utility::gaugeFromStatNames(*parent_.scope(),
Stats::Utility::gaugeFromStatNames(parent_.scope(),
{parent_.context_->stat_namespace_, metric.name_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(amount);
Expand All @@ -769,14 +792,14 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
tags.push_back({context_->tag_, context_->istio_version_.empty() ? context_->unknown_
: context_->istio_version_});

Stats::Utility::gaugeFromStatNames(*scope(),
Stats::Utility::gaugeFromStatNames(scope(),
{context_->stat_namespace_, context_->istio_build_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(1);
}

Reporter reporter() const { return reporter_; }
Stats::Scope* scope() { return scope_.scope(); }
Stats::Scope& scope() { return scope_.scope(); }

ContextSharedPtr context_;
RotatingScope scope_;
Expand All @@ -795,7 +818,7 @@ class IstioStatsFilter : public Http::PassThroughFilter,
public Network::ConnectionCallbacks {
public:
IstioStatsFilter(ConfigSharedPtr config)
: config_(config), context_(*config->context_), pool_(config->scope()->symbolTable()),
: config_(config), context_(*config->context_), pool_(config->scope().symbolTable()),
stream_(*config_, pool_) {
tags_.reserve(25);
switch (config_->reporter()) {
Expand Down

0 comments on commit e35dd28

Please sign in to comment.