From 5042439da5bdf47acd184412993463f4e7f4fde6 Mon Sep 17 00:00:00 2001 From: Shivani Bhardwaj Date: Tue, 20 Aug 2024 16:20:20 +0530 Subject: [PATCH 1/2] flow: add support to define elephant flows 1. Add user defined elephant flow definition based on rate of bytes easily configurable in suricata.yaml. 2. Add an elephant flow counter. Feature 5647 --- etc/schema.json | 4 ++++ src/decode.c | 1 + src/decode.h | 1 + src/flow.c | 30 ++++++++++++++++++++++++++++++ src/flow.h | 5 ++++- suricata.yaml.in | 7 +++++++ 6 files changed, 47 insertions(+), 1 deletion(-) diff --git a/etc/schema.json b/etc/schema.json index 08959e3c7a1d..d6ba53e236f7 100644 --- a/etc/schema.json +++ b/etc/schema.json @@ -6083,6 +6083,10 @@ "Number of times retrieval of flow from hash was attempted but was unsuccessful", "type": "integer" }, + "elephant_flows": { + "description": "Total number of elephant flows seen", + "type": "integer" + }, "icmpv4": { "description": "Number of ICMPv4 flows", "type": "integer" diff --git a/src/decode.c b/src/decode.c index 0ed546def225..24619043da46 100644 --- a/src/decode.c +++ b/src/decode.c @@ -663,6 +663,7 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv) dtv->counter_flow_get_used_eval_reject = StatsRegisterCounter("flow.get_used_eval_reject", tv); dtv->counter_flow_get_used_eval_busy = StatsRegisterCounter("flow.get_used_eval_busy", tv); dtv->counter_flow_get_used_failed = StatsRegisterCounter("flow.get_used_failed", tv); + dtv->counter_flow_elephant = StatsRegisterCounter("flow.elephant_flows", tv); dtv->counter_flow_spare_sync_avg = StatsRegisterAvgCounter("flow.wrk.spare_sync_avg", tv); dtv->counter_flow_spare_sync = StatsRegisterCounter("flow.wrk.spare_sync", tv); diff --git a/src/decode.h b/src/decode.h index f36c41a8422e..084e9aaf2dfe 100644 --- a/src/decode.h +++ b/src/decode.h @@ -1004,6 +1004,7 @@ typedef struct DecodeThreadVars_ uint16_t counter_flow_get_used_eval_reject; uint16_t counter_flow_get_used_eval_busy; uint16_t counter_flow_get_used_failed; + uint16_t counter_flow_elephant; uint16_t counter_flow_spare_sync; uint16_t counter_flow_spare_sync_empty; diff --git a/src/flow.c b/src/flow.c index aea79d23bf08..a81f4603b60b 100644 --- a/src/flow.c +++ b/src/flow.c @@ -210,6 +210,23 @@ int FlowChangeProto(Flow *f) return 0; } +static void FlowIsElephant(Flow *f, ThreadVars *tv, DecodeThreadVars *dtv) +{ + if (f->elephant) + return; /* Flow is a known elephant flow, no need to calculate again */ + uint32_t age = SCTIME_SECS(f->lastts) - SCTIME_SECS(f->startts); + if (age == 0) + return; + uint64_t rate_byte_influx = (f->tosrcbytecnt + f->todstbytecnt) / age; + if (rate_byte_influx >= flow_config.elephant_flow_rate) { + f->elephant = true; + StatsIncr(tv, dtv->counter_flow_elephant); + /* Let Flow Manager take care of this elephant flow */ + } + + /* Nothing should be reset as an elephant flow should not be considered a normal flow */ +} + static inline void FlowSwapFlags(Flow *f) { SWAP_FLAGS(f->flags, FLOW_TO_SRC_SEEN, FLOW_TO_DST_SEEN); @@ -422,6 +439,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars if (pkt_dir == TOSERVER) { f->todstpktcnt++; f->todstbytecnt += GET_PKT_LEN(p); + FlowIsElephant(f, tv, dtv); p->flowflags = FLOW_PKT_TOSERVER; if (!(f->flags & FLOW_TO_DST_SEEN)) { if (FlowUpdateSeenFlag(p)) { @@ -446,6 +464,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars } else { f->tosrcpktcnt++; f->tosrcbytecnt += GET_PKT_LEN(p); + FlowIsElephant(f, tv, dtv); p->flowflags = FLOW_PKT_TOCLIENT; if (!(f->flags & FLOW_TO_SRC_SEEN)) { if (FlowUpdateSeenFlag(p)) { @@ -546,6 +565,7 @@ void FlowInitConfig(bool quiet) flow_config.hash_size = FLOW_DEFAULT_HASHSIZE; flow_config.prealloc = FLOW_DEFAULT_PREALLOC; SC_ATOMIC_SET(flow_config.memcap, FLOW_DEFAULT_MEMCAP); + flow_config.elephant_flow_rate = UINT32_MAX; /* If we have specific config, overwrite the defaults with them, * otherwise, leave the default values */ @@ -609,6 +629,16 @@ void FlowInitConfig(bool quiet) } } + if ((ConfGet("elephant-flow.rate", &conf_val)) == 1) { + if (conf_val == NULL) { + FatalError("Invalid value for elephant_flow: NULL"); + } + + if (StringParseUint32(&configval, 10, strlen(conf_val), conf_val) > 0) { + flow_config.elephant_flow_rate = configval; + } + } + flow_config.memcap_policy = ExceptionPolicyParse("flow.memcap-policy", false); SCLogDebug("Flow config from suricata.yaml: memcap: %"PRIu64", hash-size: " diff --git a/src/flow.h b/src/flow.h index 554f9fca4a32..d3c3af84cffb 100644 --- a/src/flow.h +++ b/src/flow.h @@ -297,6 +297,8 @@ typedef struct FlowCnf_ uint32_t timeout_new; uint32_t timeout_est; + uint32_t elephant_flow_rate; + uint32_t emergency_recovery; enum ExceptionPolicy memcap_policy; @@ -416,6 +418,8 @@ typedef struct Flow_ FlowStateType flow_state; + bool elephant; + /** flow tenant id, used to setup flow timeout and stream pseudo * packets with the correct tenant id set */ uint32_t tenant_id; @@ -498,7 +502,6 @@ typedef struct Flow_ uint32_t tosrcpktcnt; uint64_t todstbytecnt; uint64_t tosrcbytecnt; - Storage storage[]; } Flow; diff --git a/suricata.yaml.in b/suricata.yaml.in index 82a72bad0f3a..9553d1d5e187 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -1470,6 +1470,13 @@ flow: #managers: 1 # default to one flow manager #recyclers: 1 # default to one flow recycler thread +# Mark a flow as an elephant flow if the rate of influx of bytes is equal to +# or exceeds the given number. The rate is calculated by dividing the total +# number of bytes a flow has seen in either direction divided by the time a +# flow has been alive. +elephant-flow: + rate: 1000000 + # This option controls the use of VLAN ids in the flow (and defrag) # hashing. Normally this should be enabled, but in some (broken) # setups where both sides of a flow are not tagged with the same VLAN From 4584f91ad163c0ee808fc4d0655385d109dab2be Mon Sep 17 00:00:00 2001 From: Shivani Bhardwaj Date: Thu, 5 Dec 2024 19:10:14 +0530 Subject: [PATCH 2/2] flow: move fields around to fill memory holes This brings us down from 2 holes of 2 bytes and 4 bytes to one hole of 5 bytes. There may be more room for improvement. --- src/flow.h | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/flow.h b/src/flow.h index d3c3af84cffb..230d2b4027c6 100644 --- a/src/flow.h +++ b/src/flow.h @@ -420,6 +420,9 @@ typedef struct Flow_ bool elephant; + uint8_t flow_end_flags; + /* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */ + /** flow tenant id, used to setup flow timeout and stream pseudo * packets with the correct tenant id set */ uint32_t tenant_id; @@ -446,16 +449,6 @@ typedef struct Flow_ #error Enable FLOWLOCK_RWLOCK or FLOWLOCK_MUTEX #endif - /** protocol specific data pointer, e.g. for TcpSession */ - void *protoctx; - - /** mapping to Flow's protocol specific protocols for timeouts - and state and free functions. */ - uint8_t protomap; - - uint8_t flow_end_flags; - /* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */ - AppProto alproto; /**< \brief application level protocol */ AppProto alproto_ts; AppProto alproto_tc; @@ -467,14 +460,21 @@ typedef struct Flow_ * STARTTLS. */ AppProto alproto_expect; + uint8_t min_ttl_toserver; + uint8_t max_ttl_toserver; + /** detection engine ctx version used to inspect this flow. Set at initial * inspection. If it doesn't match the currently in use de_ctx, the * stored sgh ptrs are reset. */ uint32_t de_ctx_version; - /** ttl tracking */ - uint8_t min_ttl_toserver; - uint8_t max_ttl_toserver; + /** protocol specific data pointer, e.g. for TcpSession */ + void *protoctx; + + /** mapping to Flow's protocol specific protocols for timeouts + and state and free functions. */ + uint8_t protomap; + uint8_t min_ttl_toclient; uint8_t max_ttl_toclient;