diff --git a/api/filter/ratelimit.proto b/api/filter/ratelimit.proto new file mode 100644 index 000000000..b79cee3af --- /dev/null +++ b/api/filter/ratelimit.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package filter; +option go_package = "kmesh.net/kmesh/api/filter;filter"; + +/* + TokenBucket defines parameters for a token bucket rate limiter. + https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/v3/token_bucket.proto#envoy-v3-api-msg-type-v3-tokenbucket + { + "max_tokens": ..., + "tokens_per_fill": {...}, + "fill_interval": {...} + } +*/ +message TokenBucket { + // The maximum number of tokens in the bucket. + int64 max_tokens = 1; + + // The number of tokens added to the bucket during each fill interval. + int64 tokens_per_fill = 2; + + // The interval at which the bucket is refilled in nanoseconds. + int64 fill_interval = 3; +} + + +/* + LocalRateLimit defines parameters for local rate limiting. + https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto#envoy-v3-api-msg-extensions-filters-http-local-ratelimit-v3-localratelimit + { + "stat_prefix": ..., + "status": {...}, + "token_bucket": {...}, + "filter_enabled": {...}, + "filter_enforced": {...}, + "request_headers_to_add_when_not_enforced": [], + "response_headers_to_add": [], + "descriptors": [], + "stage": ..., + "local_rate_limit_per_downstream_connection": ..., + "local_cluster_rate_limit": {...}, + "enable_x_ratelimit_headers": ..., + "vh_rate_limits": ..., + "always_consume_default_token_bucket": {...}, + "rate_limited_as_resource_exhausted": ... + } +*/ +message LocalRateLimit { + reserved 1 to 2; + + // The token bucket configuration for the rate limiter. + TokenBucket token_bucket = 3; + + // Reserved field numbers for future use + reserved 4 to 15; +} + diff --git a/api/listener/listener_components.proto b/api/listener/listener_components.proto index 6b4533252..0a8553518 100644 --- a/api/listener/listener_components.proto +++ b/api/listener/listener_components.proto @@ -6,12 +6,14 @@ option go_package = "kmesh.net/kmesh/api/listener;listener"; import "api/core/address.proto"; import "api/filter/tcp_proxy.proto"; import "api/filter/http_connection_manager.proto"; +import "api/filter/ratelimit.proto"; message Filter { string name = 1; oneof config_type { filter.TcpProxy tcp_proxy = 2; filter.HttpConnectionManager http_connection_manager = 3; + filter.LocalRateLimit local_rate_limit = 4; } } diff --git a/api/v2-c/filter/ratelimit.pb-c.c b/api/v2-c/filter/ratelimit.pb-c.c new file mode 100644 index 000000000..05d6530ab --- /dev/null +++ b/api/v2-c/filter/ratelimit.pb-c.c @@ -0,0 +1,201 @@ +/* Generated by the protocol buffer compiler. DO NOT EDIT! */ +/* Generated from: api/filter/ratelimit.proto */ + +/* Do not generate deprecated warnings for self */ +#ifndef PROTOBUF_C__NO_DEPRECATED +#define PROTOBUF_C__NO_DEPRECATED +#endif + +#include "filter/ratelimit.pb-c.h" +void filter__token_bucket__init + (Filter__TokenBucket *message) +{ + static const Filter__TokenBucket init_value = FILTER__TOKEN_BUCKET__INIT; + *message = init_value; +} +size_t filter__token_bucket__get_packed_size + (const Filter__TokenBucket *message) +{ + assert(message->base.descriptor == &filter__token_bucket__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t filter__token_bucket__pack + (const Filter__TokenBucket *message, + uint8_t *out) +{ + assert(message->base.descriptor == &filter__token_bucket__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t filter__token_bucket__pack_to_buffer + (const Filter__TokenBucket *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &filter__token_bucket__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Filter__TokenBucket * + filter__token_bucket__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Filter__TokenBucket *) + protobuf_c_message_unpack (&filter__token_bucket__descriptor, + allocator, len, data); +} +void filter__token_bucket__free_unpacked + (Filter__TokenBucket *message, + ProtobufCAllocator *allocator) +{ + if(!message) + return; + assert(message->base.descriptor == &filter__token_bucket__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} +void filter__local_rate_limit__init + (Filter__LocalRateLimit *message) +{ + static const Filter__LocalRateLimit init_value = FILTER__LOCAL_RATE_LIMIT__INIT; + *message = init_value; +} +size_t filter__local_rate_limit__get_packed_size + (const Filter__LocalRateLimit *message) +{ + assert(message->base.descriptor == &filter__local_rate_limit__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t filter__local_rate_limit__pack + (const Filter__LocalRateLimit *message, + uint8_t *out) +{ + assert(message->base.descriptor == &filter__local_rate_limit__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t filter__local_rate_limit__pack_to_buffer + (const Filter__LocalRateLimit *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &filter__local_rate_limit__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Filter__LocalRateLimit * + filter__local_rate_limit__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Filter__LocalRateLimit *) + protobuf_c_message_unpack (&filter__local_rate_limit__descriptor, + allocator, len, data); +} +void filter__local_rate_limit__free_unpacked + (Filter__LocalRateLimit *message, + ProtobufCAllocator *allocator) +{ + if(!message) + return; + assert(message->base.descriptor == &filter__local_rate_limit__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} +static const ProtobufCFieldDescriptor filter__token_bucket__field_descriptors[3] = +{ + { + "max_tokens", + 1, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_INT64, + 0, /* quantifier_offset */ + offsetof(Filter__TokenBucket, max_tokens), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "tokens_per_fill", + 2, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_INT64, + 0, /* quantifier_offset */ + offsetof(Filter__TokenBucket, tokens_per_fill), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "fill_interval", + 3, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_INT64, + 0, /* quantifier_offset */ + offsetof(Filter__TokenBucket, fill_interval), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned filter__token_bucket__field_indices_by_name[] = { + 2, /* field[2] = fill_interval */ + 0, /* field[0] = max_tokens */ + 1, /* field[1] = tokens_per_fill */ +}; +static const ProtobufCIntRange filter__token_bucket__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 3 } +}; +const ProtobufCMessageDescriptor filter__token_bucket__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "filter.TokenBucket", + "TokenBucket", + "Filter__TokenBucket", + "filter", + sizeof(Filter__TokenBucket), + 3, + filter__token_bucket__field_descriptors, + filter__token_bucket__field_indices_by_name, + 1, filter__token_bucket__number_ranges, + (ProtobufCMessageInit) filter__token_bucket__init, + NULL,NULL,NULL /* reserved[123] */ +}; +static const ProtobufCFieldDescriptor filter__local_rate_limit__field_descriptors[1] = +{ + { + "token_bucket", + 3, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_MESSAGE, + 0, /* quantifier_offset */ + offsetof(Filter__LocalRateLimit, token_bucket), + &filter__token_bucket__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned filter__local_rate_limit__field_indices_by_name[] = { + 0, /* field[0] = token_bucket */ +}; +static const ProtobufCIntRange filter__local_rate_limit__number_ranges[1 + 1] = +{ + { 3, 0 }, + { 0, 1 } +}; +const ProtobufCMessageDescriptor filter__local_rate_limit__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "filter.LocalRateLimit", + "LocalRateLimit", + "Filter__LocalRateLimit", + "filter", + sizeof(Filter__LocalRateLimit), + 1, + filter__local_rate_limit__field_descriptors, + filter__local_rate_limit__field_indices_by_name, + 1, filter__local_rate_limit__number_ranges, + (ProtobufCMessageInit) filter__local_rate_limit__init, + NULL,NULL,NULL /* reserved[123] */ +}; diff --git a/api/v2-c/filter/ratelimit.pb-c.h b/api/v2-c/filter/ratelimit.pb-c.h new file mode 100644 index 000000000..009f3501c --- /dev/null +++ b/api/v2-c/filter/ratelimit.pb-c.h @@ -0,0 +1,149 @@ +/* Generated by the protocol buffer compiler. DO NOT EDIT! */ +/* Generated from: api/filter/ratelimit.proto */ + +#ifndef PROTOBUF_C_api_2ffilter_2fratelimit_2eproto__INCLUDED +#define PROTOBUF_C_api_2ffilter_2fratelimit_2eproto__INCLUDED + +#include + +PROTOBUF_C__BEGIN_DECLS + +#if PROTOBUF_C_VERSION_NUMBER < 1003000 +# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. +#elif 1004001 < PROTOBUF_C_MIN_COMPILER_VERSION +# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. +#endif + + +typedef struct Filter__TokenBucket Filter__TokenBucket; +typedef struct Filter__LocalRateLimit Filter__LocalRateLimit; + + +/* --- enums --- */ + + +/* --- messages --- */ + +/* + *TokenBucket defines parameters for a token bucket rate limiter. + *https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/v3/token_bucket.proto#envoy-v3-api-msg-type-v3-tokenbucket + *{ + *"max_tokens": ..., + *"tokens_per_fill": {...}, + *"fill_interval": {...} + *} + */ +struct Filter__TokenBucket +{ + ProtobufCMessage base; + /* + * The maximum number of tokens in the bucket. + */ + int64_t max_tokens; + /* + * The number of tokens added to the bucket during each fill interval. + */ + int64_t tokens_per_fill; + /* + * The interval at which the bucket is refilled in nanoseconds. + */ + int64_t fill_interval; +}; +#define FILTER__TOKEN_BUCKET__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&filter__token_bucket__descriptor) \ + , 0, 0, 0 } + + +/* + *LocalRateLimit defines parameters for local rate limiting. + *https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto#envoy-v3-api-msg-extensions-filters-http-local-ratelimit-v3-localratelimit + *{ + *"stat_prefix": ..., + *"status": {...}, + *"token_bucket": {...}, + *"filter_enabled": {...}, + *"filter_enforced": {...}, + *"request_headers_to_add_when_not_enforced": [], + *"response_headers_to_add": [], + *"descriptors": [], + *"stage": ..., + *"local_rate_limit_per_downstream_connection": ..., + *"local_cluster_rate_limit": {...}, + *"enable_x_ratelimit_headers": ..., + *"vh_rate_limits": ..., + *"always_consume_default_token_bucket": {...}, + *"rate_limited_as_resource_exhausted": ... + *} + */ +struct Filter__LocalRateLimit +{ + ProtobufCMessage base; + /* + * The token bucket configuration for the rate limiter. + */ + Filter__TokenBucket *token_bucket; +}; +#define FILTER__LOCAL_RATE_LIMIT__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&filter__local_rate_limit__descriptor) \ + , NULL } + + +/* Filter__TokenBucket methods */ +void filter__token_bucket__init + (Filter__TokenBucket *message); +size_t filter__token_bucket__get_packed_size + (const Filter__TokenBucket *message); +size_t filter__token_bucket__pack + (const Filter__TokenBucket *message, + uint8_t *out); +size_t filter__token_bucket__pack_to_buffer + (const Filter__TokenBucket *message, + ProtobufCBuffer *buffer); +Filter__TokenBucket * + filter__token_bucket__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void filter__token_bucket__free_unpacked + (Filter__TokenBucket *message, + ProtobufCAllocator *allocator); +/* Filter__LocalRateLimit methods */ +void filter__local_rate_limit__init + (Filter__LocalRateLimit *message); +size_t filter__local_rate_limit__get_packed_size + (const Filter__LocalRateLimit *message); +size_t filter__local_rate_limit__pack + (const Filter__LocalRateLimit *message, + uint8_t *out); +size_t filter__local_rate_limit__pack_to_buffer + (const Filter__LocalRateLimit *message, + ProtobufCBuffer *buffer); +Filter__LocalRateLimit * + filter__local_rate_limit__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void filter__local_rate_limit__free_unpacked + (Filter__LocalRateLimit *message, + ProtobufCAllocator *allocator); +/* --- per-message closures --- */ + +typedef void (*Filter__TokenBucket_Closure) + (const Filter__TokenBucket *message, + void *closure_data); +typedef void (*Filter__LocalRateLimit_Closure) + (const Filter__LocalRateLimit *message, + void *closure_data); + +/* --- services --- */ + + +/* --- descriptors --- */ + +extern const ProtobufCMessageDescriptor filter__token_bucket__descriptor; +extern const ProtobufCMessageDescriptor filter__local_rate_limit__descriptor; + +PROTOBUF_C__END_DECLS + + +#endif /* PROTOBUF_C_api_2ffilter_2fratelimit_2eproto__INCLUDED */ diff --git a/api/v2-c/listener/listener_components.pb-c.c b/api/v2-c/listener/listener_components.pb-c.c index 8c5102f35..90b664e8a 100644 --- a/api/v2-c/listener/listener_components.pb-c.c +++ b/api/v2-c/listener/listener_components.pb-c.c @@ -142,7 +142,7 @@ void listener__filter_chain__free_unpacked assert(message->base.descriptor == &listener__filter_chain__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } -static const ProtobufCFieldDescriptor listener__filter__field_descriptors[3] = +static const ProtobufCFieldDescriptor listener__filter__field_descriptors[4] = { { "name", @@ -180,16 +180,29 @@ static const ProtobufCFieldDescriptor listener__filter__field_descriptors[3] = 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "local_rate_limit", + 4, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_MESSAGE, + offsetof(Listener__Filter, config_type_case), + offsetof(Listener__Filter, local_rate_limit), + &filter__local_rate_limit__descriptor, + NULL, + 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned listener__filter__field_indices_by_name[] = { 2, /* field[2] = http_connection_manager */ + 3, /* field[3] = local_rate_limit */ 0, /* field[0] = name */ 1, /* field[1] = tcp_proxy */ }; static const ProtobufCIntRange listener__filter__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 3 } + { 0, 4 } }; const ProtobufCMessageDescriptor listener__filter__descriptor = { @@ -199,7 +212,7 @@ const ProtobufCMessageDescriptor listener__filter__descriptor = "Listener__Filter", "listener", sizeof(Listener__Filter), - 3, + 4, listener__filter__field_descriptors, listener__filter__field_indices_by_name, 1, listener__filter__number_ranges, diff --git a/api/v2-c/listener/listener_components.pb-c.h b/api/v2-c/listener/listener_components.pb-c.h index 72e7f00b1..78ddcc633 100644 --- a/api/v2-c/listener/listener_components.pb-c.h +++ b/api/v2-c/listener/listener_components.pb-c.h @@ -17,6 +17,7 @@ PROTOBUF_C__BEGIN_DECLS #include "core/address.pb-c.h" #include "filter/tcp_proxy.pb-c.h" #include "filter/http_connection_manager.pb-c.h" +#include "filter/ratelimit.pb-c.h" typedef struct Listener__Filter Listener__Filter; typedef struct Listener__FilterChainMatch Listener__FilterChainMatch; @@ -31,7 +32,8 @@ typedef struct Listener__FilterChain Listener__FilterChain; typedef enum { LISTENER__FILTER__CONFIG_TYPE__NOT_SET = 0, LISTENER__FILTER__CONFIG_TYPE_TCP_PROXY = 2, - LISTENER__FILTER__CONFIG_TYPE_HTTP_CONNECTION_MANAGER = 3 + LISTENER__FILTER__CONFIG_TYPE_HTTP_CONNECTION_MANAGER = 3, + LISTENER__FILTER__CONFIG_TYPE_LOCAL_RATE_LIMIT = 4 PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(LISTENER__FILTER__CONFIG_TYPE__CASE) } Listener__Filter__ConfigTypeCase; @@ -43,6 +45,7 @@ struct Listener__Filter union { Filter__TcpProxy *tcp_proxy; Filter__HttpConnectionManager *http_connection_manager; + Filter__LocalRateLimit *local_rate_limit; }; }; #define LISTENER__FILTER__INIT \ diff --git a/api/v2/filter/ratelimit.pb.go b/api/v2/filter/ratelimit.pb.go new file mode 100644 index 000000000..d7c595e23 --- /dev/null +++ b/api/v2/filter/ratelimit.pb.go @@ -0,0 +1,262 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v3.17.3 +// source: api/filter/ratelimit.proto + +package filter + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// TokenBucket defines parameters for a token bucket rate limiter. +// https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/v3/token_bucket.proto#envoy-v3-api-msg-type-v3-tokenbucket +// { +// "max_tokens": ..., +// "tokens_per_fill": {...}, +// "fill_interval": {...} +// } +type TokenBucket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The maximum number of tokens in the bucket. + MaxTokens int64 `protobuf:"varint,1,opt,name=max_tokens,json=maxTokens,proto3" json:"max_tokens,omitempty"` + // The number of tokens added to the bucket during each fill interval. + TokensPerFill int64 `protobuf:"varint,2,opt,name=tokens_per_fill,json=tokensPerFill,proto3" json:"tokens_per_fill,omitempty"` + // The interval at which the bucket is refilled in nanoseconds. + FillInterval int64 `protobuf:"varint,3,opt,name=fill_interval,json=fillInterval,proto3" json:"fill_interval,omitempty"` +} + +func (x *TokenBucket) Reset() { + *x = TokenBucket{} + if protoimpl.UnsafeEnabled { + mi := &file_api_filter_ratelimit_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TokenBucket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TokenBucket) ProtoMessage() {} + +func (x *TokenBucket) ProtoReflect() protoreflect.Message { + mi := &file_api_filter_ratelimit_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TokenBucket.ProtoReflect.Descriptor instead. +func (*TokenBucket) Descriptor() ([]byte, []int) { + return file_api_filter_ratelimit_proto_rawDescGZIP(), []int{0} +} + +func (x *TokenBucket) GetMaxTokens() int64 { + if x != nil { + return x.MaxTokens + } + return 0 +} + +func (x *TokenBucket) GetTokensPerFill() int64 { + if x != nil { + return x.TokensPerFill + } + return 0 +} + +func (x *TokenBucket) GetFillInterval() int64 { + if x != nil { + return x.FillInterval + } + return 0 +} + +// LocalRateLimit defines parameters for local rate limiting. +// https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto#envoy-v3-api-msg-extensions-filters-http-local-ratelimit-v3-localratelimit +// { +// "stat_prefix": ..., +// "status": {...}, +// "token_bucket": {...}, +// "filter_enabled": {...}, +// "filter_enforced": {...}, +// "request_headers_to_add_when_not_enforced": [], +// "response_headers_to_add": [], +// "descriptors": [], +// "stage": ..., +// "local_rate_limit_per_downstream_connection": ..., +// "local_cluster_rate_limit": {...}, +// "enable_x_ratelimit_headers": ..., +// "vh_rate_limits": ..., +// "always_consume_default_token_bucket": {...}, +// "rate_limited_as_resource_exhausted": ... +// } +type LocalRateLimit struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The token bucket configuration for the rate limiter. + TokenBucket *TokenBucket `protobuf:"bytes,3,opt,name=token_bucket,json=tokenBucket,proto3" json:"token_bucket,omitempty"` +} + +func (x *LocalRateLimit) Reset() { + *x = LocalRateLimit{} + if protoimpl.UnsafeEnabled { + mi := &file_api_filter_ratelimit_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LocalRateLimit) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LocalRateLimit) ProtoMessage() {} + +func (x *LocalRateLimit) ProtoReflect() protoreflect.Message { + mi := &file_api_filter_ratelimit_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LocalRateLimit.ProtoReflect.Descriptor instead. +func (*LocalRateLimit) Descriptor() ([]byte, []int) { + return file_api_filter_ratelimit_proto_rawDescGZIP(), []int{1} +} + +func (x *LocalRateLimit) GetTokenBucket() *TokenBucket { + if x != nil { + return x.TokenBucket + } + return nil +} + +var File_api_filter_ratelimit_proto protoreflect.FileDescriptor + +var file_api_filter_ratelimit_proto_rawDesc = []byte{ + 0x0a, 0x1a, 0x61, 0x70, 0x69, 0x2f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2f, 0x72, 0x61, 0x74, + 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x66, 0x69, + 0x6c, 0x74, 0x65, 0x72, 0x22, 0x79, 0x0a, 0x0b, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x42, 0x75, 0x63, + 0x6b, 0x65, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x61, 0x78, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x6d, 0x61, 0x78, 0x54, 0x6f, 0x6b, 0x65, + 0x6e, 0x73, 0x12, 0x26, 0x0a, 0x0f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x5f, 0x70, 0x65, 0x72, + 0x5f, 0x66, 0x69, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x74, 0x6f, 0x6b, + 0x65, 0x6e, 0x73, 0x50, 0x65, 0x72, 0x46, 0x69, 0x6c, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x66, 0x69, + 0x6c, 0x6c, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x6c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x22, + 0x54, 0x0a, 0x0e, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, + 0x74, 0x12, 0x36, 0x0a, 0x0c, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x5f, 0x62, 0x75, 0x63, 0x6b, 0x65, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, + 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x0b, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x4a, 0x04, 0x08, 0x01, 0x10, 0x03, 0x4a, + 0x04, 0x08, 0x04, 0x10, 0x10, 0x42, 0x23, 0x5a, 0x21, 0x6b, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x6e, + 0x65, 0x74, 0x2f, 0x6b, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x66, 0x69, 0x6c, + 0x74, 0x65, 0x72, 0x3b, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_api_filter_ratelimit_proto_rawDescOnce sync.Once + file_api_filter_ratelimit_proto_rawDescData = file_api_filter_ratelimit_proto_rawDesc +) + +func file_api_filter_ratelimit_proto_rawDescGZIP() []byte { + file_api_filter_ratelimit_proto_rawDescOnce.Do(func() { + file_api_filter_ratelimit_proto_rawDescData = protoimpl.X.CompressGZIP(file_api_filter_ratelimit_proto_rawDescData) + }) + return file_api_filter_ratelimit_proto_rawDescData +} + +var file_api_filter_ratelimit_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_api_filter_ratelimit_proto_goTypes = []interface{}{ + (*TokenBucket)(nil), // 0: filter.TokenBucket + (*LocalRateLimit)(nil), // 1: filter.LocalRateLimit +} +var file_api_filter_ratelimit_proto_depIdxs = []int32{ + 0, // 0: filter.LocalRateLimit.token_bucket:type_name -> filter.TokenBucket + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_api_filter_ratelimit_proto_init() } +func file_api_filter_ratelimit_proto_init() { + if File_api_filter_ratelimit_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_api_filter_ratelimit_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TokenBucket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_filter_ratelimit_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LocalRateLimit); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_api_filter_ratelimit_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_api_filter_ratelimit_proto_goTypes, + DependencyIndexes: file_api_filter_ratelimit_proto_depIdxs, + MessageInfos: file_api_filter_ratelimit_proto_msgTypes, + }.Build() + File_api_filter_ratelimit_proto = out.File + file_api_filter_ratelimit_proto_rawDesc = nil + file_api_filter_ratelimit_proto_goTypes = nil + file_api_filter_ratelimit_proto_depIdxs = nil +} diff --git a/api/v2/listener/listener_components.pb.go b/api/v2/listener/listener_components.pb.go index a3f21b2a7..1c8a63302 100644 --- a/api/v2/listener/listener_components.pb.go +++ b/api/v2/listener/listener_components.pb.go @@ -32,6 +32,7 @@ type Filter struct { // // *Filter_TcpProxy // *Filter_HttpConnectionManager + // *Filter_LocalRateLimit ConfigType isFilter_ConfigType `protobuf_oneof:"config_type"` } @@ -95,6 +96,13 @@ func (x *Filter) GetHttpConnectionManager() *filter.HttpConnectionManager { return nil } +func (x *Filter) GetLocalRateLimit() *filter.LocalRateLimit { + if x, ok := x.GetConfigType().(*Filter_LocalRateLimit); ok { + return x.LocalRateLimit + } + return nil +} + type isFilter_ConfigType interface { isFilter_ConfigType() } @@ -107,10 +115,16 @@ type Filter_HttpConnectionManager struct { HttpConnectionManager *filter.HttpConnectionManager `protobuf:"bytes,3,opt,name=http_connection_manager,json=httpConnectionManager,proto3,oneof"` } +type Filter_LocalRateLimit struct { + LocalRateLimit *filter.LocalRateLimit `protobuf:"bytes,4,opt,name=local_rate_limit,json=localRateLimit,proto3,oneof"` +} + func (*Filter_TcpProxy) isFilter_ConfigType() {} func (*Filter_HttpConnectionManager) isFilter_ConfigType() {} +func (*Filter_LocalRateLimit) isFilter_ConfigType() {} + type FilterChainMatch struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -257,17 +271,23 @@ var file_api_listener_listener_components_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x28, 0x61, 0x70, 0x69, 0x2f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2f, 0x68, 0x74, 0x74, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x22, 0xb5, 0x01, 0x0a, 0x06, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, - 0x2f, 0x0a, 0x09, 0x74, 0x63, 0x70, 0x5f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x54, 0x63, 0x70, 0x50, - 0x72, 0x6f, 0x78, 0x79, 0x48, 0x00, 0x52, 0x08, 0x74, 0x63, 0x70, 0x50, 0x72, 0x6f, 0x78, 0x79, - 0x12, 0x57, 0x0a, 0x17, 0x68, 0x74, 0x74, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, - 0x48, 0x00, 0x52, 0x15, 0x68, 0x74, 0x74, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x42, 0x0d, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, + 0x1a, 0x1a, 0x61, 0x70, 0x69, 0x2f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2f, 0x72, 0x61, 0x74, + 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xf9, 0x01, 0x0a, + 0x06, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x74, + 0x63, 0x70, 0x5f, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, + 0x2e, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x54, 0x63, 0x70, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x48, 0x00, 0x52, 0x08, 0x74, 0x63, 0x70, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x57, 0x0a, 0x17, + 0x68, 0x74, 0x74, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, + 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x74, 0x74, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x48, 0x00, 0x52, 0x15, + 0x68, 0x74, 0x74, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x61, + 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x42, 0x0a, 0x10, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x72, + 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x52, 0x61, + 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x48, 0x00, 0x52, 0x0e, 0x6c, 0x6f, 0x63, 0x61, 0x6c, + 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x42, 0x0d, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x22, 0xd7, 0x01, 0x0a, 0x10, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x12, 0x34, 0x0a, 0x0d, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x03, @@ -316,19 +336,21 @@ var file_api_listener_listener_components_proto_goTypes = []interface{}{ (*FilterChain)(nil), // 2: listener.FilterChain (*filter.TcpProxy)(nil), // 3: filter.TcpProxy (*filter.HttpConnectionManager)(nil), // 4: filter.HttpConnectionManager - (*core.CidrRange)(nil), // 5: core.CidrRange + (*filter.LocalRateLimit)(nil), // 5: filter.LocalRateLimit + (*core.CidrRange)(nil), // 6: core.CidrRange } var file_api_listener_listener_components_proto_depIdxs = []int32{ 3, // 0: listener.Filter.tcp_proxy:type_name -> filter.TcpProxy 4, // 1: listener.Filter.http_connection_manager:type_name -> filter.HttpConnectionManager - 5, // 2: listener.FilterChainMatch.prefix_ranges:type_name -> core.CidrRange - 1, // 3: listener.FilterChain.filter_chain_match:type_name -> listener.FilterChainMatch - 0, // 4: listener.FilterChain.filters:type_name -> listener.Filter - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 5, // 2: listener.Filter.local_rate_limit:type_name -> filter.LocalRateLimit + 6, // 3: listener.FilterChainMatch.prefix_ranges:type_name -> core.CidrRange + 1, // 4: listener.FilterChain.filter_chain_match:type_name -> listener.FilterChainMatch + 0, // 5: listener.FilterChain.filters:type_name -> listener.Filter + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_api_listener_listener_components_proto_init() } @@ -377,6 +399,7 @@ func file_api_listener_listener_components_proto_init() { file_api_listener_listener_components_proto_msgTypes[0].OneofWrappers = []interface{}{ (*Filter_TcpProxy)(nil), (*Filter_HttpConnectionManager)(nil), + (*Filter_LocalRateLimit)(nil), } type x struct{} out := protoimpl.TypeBuilder{ diff --git a/bpf/kmesh/ads/include/filter.h b/bpf/kmesh/ads/include/filter.h index 7a69df914..1ce68fd54 100644 --- a/bpf/kmesh/ads/include/filter.h +++ b/bpf/kmesh/ads/include/filter.h @@ -4,6 +4,7 @@ #ifndef __KMESH_FILTER_H__ #define __KMESH_FILTER_H__ +#include "local_ratelimit.h" #include "tcp_proxy.h" #include "tail_call.h" #include "bpf_log.h" @@ -181,6 +182,10 @@ int filter_chain_manager(ctx_buff_t *ctx) if (filter_chain == NULL) { return KMESH_TAIL_CALL_RET(-1); } + + /* ratelimit check */ + Local_rate_limit__check_and_take(filter_chain, &addr, ctx); + /* filter match */ ret = filter_chain_filter_match(filter_chain, &addr, ctx, &filter, &filter_idx); if (ret != 0) { diff --git a/bpf/kmesh/ads/include/local_ratelimit.h b/bpf/kmesh/ads/include/local_ratelimit.h new file mode 100644 index 000000000..87c6ac756 --- /dev/null +++ b/bpf/kmesh/ads/include/local_ratelimit.h @@ -0,0 +1,195 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ +/* Copyright Authors of Kmesh */ + +#ifndef __KMESH_LOCAL_RATE_LIMIT_H__ +#define __KMESH_LOCAL_RATE_LIMIT_H__ + +#include "bpf_log.h" +#include "bpf_common.h" +#include "kmesh_common.h" +#include "listener/listener.pb-c.h" + +struct ratelimit_key { + union { + struct { + __u32 ipv4; /* Destination IPv4 address. */ + __u32 port; /* Destination port. */ + __u32 family; /* Address family (e.g., AF_INET) */ + } sk_skb; + } key; +}; + +struct ratelimit_value { + __u64 last_topup; /* Timestamp of the last token refill (nanoseconds) */ + __u64 tokens; /* Current number of available tokens */ +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, struct ratelimit_key); + __type(value, struct ratelimit_value); + __uint(max_entries, 1000); + __uint(map_flags, BPF_F_NO_PREALLOC); +} ratelimit_map SEC(".maps"); + +struct ratelimit_settings { + __u64 bucket_size; /* Maximum capacity of the token bucket */ + __u64 tokens_per_topup; /* Number of tokens added per refill */ + __u64 topup_interval_ns; /* Interval between token refills (nanoseconds) */ +}; + +/** + * Enforces rate limiting for a given key using token bucket algorithm. + * + * @param key Pointer to the rate limit key (e.g., IP and port). + * @param settings Pointer to rate limit settings (bucket size, refill rate, interval). + * @return 0 if allowed (token consumed), -1 if rate limit exceeded. + */ +static inline int rate_limit__check_and_take(struct ratelimit_key *key, const struct ratelimit_settings *settings) +{ + struct ratelimit_value *value; + struct ratelimit_value new_value; + __u64 now = bpf_ktime_get_ns(); + __u64 topup; + + value = bpf_map_lookup_elem(&ratelimit_map, key); + if (!value) { + new_value.last_topup = now; + new_value.tokens = settings->bucket_size; + bpf_map_update_elem(&ratelimit_map, key, &new_value, BPF_ANY); + return 0; + } + + topup = (now - value->last_topup) / settings->topup_interval_ns; + if (topup > 0) { + value->tokens += topup * settings->tokens_per_topup; + if (value->tokens > settings->bucket_size) { + value->tokens = settings->bucket_size; + } + value->last_topup += topup * settings->topup_interval_ns; + } + + if (value->tokens == 0) { + return -1; + } + + value->tokens--; + return 0; +} + +static inline int Local_rate_limit__filter__match(const Listener__Filter *filter); + +static inline int +Local_rate_limit__filter_chain__match(const Listener__FilterChain *filter_chain, Listener__Filter **filter_ptr); + +/** + * Applies local rate limiting based on the filter chain and address. + * + * @param filter_chain Pointer to the filter chain. + * @param addr Pointer to the address structure. + * @param ctx Pointer to the context buffer. + * @return 0 if allowed, -1 if rate limit exceeded or runtime error. + */ +static inline int +Local_rate_limit__check_and_take(const Listener__FilterChain *filter_chain, address_t *addr, const ctx_buff_t *ctx) +{ + int ret = 0; + Listener__Filter *filter = NULL; + Filter__LocalRateLimit *rate_limit = NULL; + Filter__TokenBucket *token_bucket = NULL; + + ret = Local_rate_limit__filter_chain__match(filter_chain, &filter); + if (ret) { + BPF_LOG(INFO, FILTERCHAIN, "no local rate limit filter matched\n"); + return 0; + } + BPF_LOG(INFO, FILTER, "local rate limit rule matched\n"); + + rate_limit = kmesh_get_ptr_val(filter->local_rate_limit); + if (!rate_limit) { + BPF_LOG(ERR, FILTERCHAIN, "get rate_limit failed\n"); + return KMESH_TAIL_CALL_RET(-1); + } + token_bucket = kmesh_get_ptr_val(rate_limit->token_bucket); + if (!token_bucket) { + BPF_LOG(ERR, FILTERCHAIN, "get token_bucket failed\n"); + return KMESH_TAIL_CALL_RET(-1); + } + + struct ratelimit_key key = { + .key.sk_skb.ipv4 = addr->ipv4, + .key.sk_skb.port = addr->port, + }; + + struct ratelimit_settings settings = { + .bucket_size = token_bucket->max_tokens, + .tokens_per_topup = token_bucket->tokens_per_fill, + .topup_interval_ns = token_bucket->fill_interval, + }; + + if (rate_limit__check_and_take(&key, &settings)) { + BPF_LOG(INFO, FILTERCHAIN, "rate limit exceeded\n"); +// TODO: implement rate limit exceeded action after #570 merged +#define MARK_REJECTED(ctx) + MARK_REJECTED(ctx); + return -1; + } + BPF_LOG(INFO, FILTERCHAIN, "rate limit passed\n"); + return 0; +} + +static inline int Local_rate_limit__filter__match(const Listener__Filter *filter) +{ + if (!filter) { + BPF_LOG(ERR, FILTER, "filter is NULL\n"); + return 0; + } + + if (filter->config_type_case != LISTENER__FILTER__CONFIG_TYPE_LOCAL_RATE_LIMIT) { + return 0; + } + return 1; +} + +static inline int +Local_rate_limit__filter_chain__match(const Listener__FilterChain *filter_chain, Listener__Filter **filter_ptr) +{ + void *ptrs = NULL; + Listener__Filter *filter = NULL; + + if (!filter_ptr) { + BPF_LOG(ERR, FILTERCHAIN, "invalid params\n"); + return -1; + } + + if (filter_chain->n_filters == 0 || filter_chain->n_filters > KMESH_PER_FILTER_NUM) { + BPF_LOG(ERR, FILTERCHAIN, "nfilter num(%d) invalid\n", filter_chain->n_filters); + return -1; + } + + ptrs = kmesh_get_ptr_val(filter_chain->filters); + if (!ptrs) { + BPF_LOG(ERR, FILTER, "failed to get filter ptrs\n"); + return -1; + } + +#pragma unroll + for (unsigned int i = 0; i < KMESH_PER_FILTER_NUM; i++) { + if (i >= filter_chain->n_filters) { + break; + } + + filter = (Listener__Filter *)kmesh_get_ptr_val((void *)*((__u64 *)ptrs + i)); + if (!filter) { + continue; + } + + if (Local_rate_limit__filter__match(filter)) { + *filter_ptr = filter; + return 0; + } + } + return -1; +} + +#endif \ No newline at end of file diff --git a/go.mod b/go.mod index a4b46fac8..5b2541c43 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/agiledragon/gomonkey/v2 v2.12.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.16.0 + github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b github.com/containernetworking/cni v1.2.3 github.com/containernetworking/plugins v1.5.1 github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 @@ -60,7 +61,6 @@ require ( github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/cheggaaa/pb/v3 v3.1.5 // indirect - github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index c40767a29..de4637ecf 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -36,6 +36,7 @@ import ( listener_v2 "kmesh.net/kmesh/api/v2/listener" route_v2 "kmesh.net/kmesh/api/v2/route" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" + "kmesh.net/kmesh/pkg/controller/ads/extensions" "kmesh.net/kmesh/pkg/nets" ) @@ -292,6 +293,10 @@ func newApiFilterAndRouteName(filter *config_listener_v3.Filter) (*listener_v2.F } } apiFilter.ConfigType = &apiFilterHttp + case extensions.LocalRateLimit: + if apiFilter.ConfigType, err = extensions.NewLocalRateLimit(filter); err != nil { + return nil, "" + } default: } case *config_listener_v3.Filter_ConfigDiscovery: diff --git a/pkg/controller/ads/extensions/local_ratelimit.go b/pkg/controller/ads/extensions/local_ratelimit.go new file mode 100644 index 000000000..8d0522de3 --- /dev/null +++ b/pkg/controller/ads/extensions/local_ratelimit.go @@ -0,0 +1,56 @@ +package extensions + +import ( + "fmt" + "time" + + v1 "github.com/cncf/xds/go/udpa/type/v1" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "kmesh.net/kmesh/api/v2/filter" + "kmesh.net/kmesh/api/v2/listener" +) + +const LocalRateLimit = "envoy.filters.tcp.local_ratelimit" + +// NewLocalRateLimit constructs a new LocalRateLimit filter wrapper. +func NewLocalRateLimit(filter *listenerv3.Filter) (*listener.Filter_LocalRateLimit, error) { + localRateLimit, err := newLocalRateLimit(filter) + if err != nil { + return nil, err + } + + return &listener.Filter_LocalRateLimit{ + LocalRateLimit: localRateLimit, + }, nil +} + +// newLocalRateLimit creates a new LocalRateLimit filter. +func newLocalRateLimit(Filter *listenerv3.Filter) (*filter.LocalRateLimit, error) { + unstructured, err := unmarshalToTypedStruct(Filter) + if err != nil { + return nil, err + } + + bucket := unstructured.GetValue().GetFields()["token_bucket"].GetStructValue().GetFields() + interval, err := time.ParseDuration(bucket["fill_interval"].GetStringValue()) + if err != nil { + return nil, fmt.Errorf("failed to convert fill_interval") + } + return &filter.LocalRateLimit{TokenBucket: &filter.TokenBucket{ + MaxTokens: int64(bucket["max_tokens"].GetNumberValue()), + TokensPerFill: int64(bucket["tokens_per_fill"].GetNumberValue()), + FillInterval: interval.Nanoseconds(), + }}, nil +} + +// unmarshalToTypedStruct unmarshal a protobuf Any message to a TypedStruct. +func unmarshalToTypedStruct(filter *listenerv3.Filter) (*v1.TypedStruct, error) { + typed := &v1.TypedStruct{} + if err := anypb.UnmarshalTo(filter.GetTypedConfig(), typed, proto.UnmarshalOptions{}); err != nil { + return nil, fmt.Errorf("failed to unmarshal TypedConfig %w", err) + } + return typed, nil +}