Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark flow elephant/v3 #12237

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions etc/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6083,6 +6083,10 @@
"Number of times retrieval of flow from hash was attempted but was unsuccessful",
"type": "integer"
},
"elephant_flows": {
"description": "Total number of elephant flows seen",
"type": "integer"
},
"icmpv4": {
"description": "Number of ICMPv4 flows",
"type": "integer"
Expand Down
1 change: 1 addition & 0 deletions src/decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
dtv->counter_flow_get_used_eval_reject = StatsRegisterCounter("flow.get_used_eval_reject", tv);
dtv->counter_flow_get_used_eval_busy = StatsRegisterCounter("flow.get_used_eval_busy", tv);
dtv->counter_flow_get_used_failed = StatsRegisterCounter("flow.get_used_failed", tv);
dtv->counter_flow_elephant = StatsRegisterCounter("flow.elephant_flows", tv);

dtv->counter_flow_spare_sync_avg = StatsRegisterAvgCounter("flow.wrk.spare_sync_avg", tv);
dtv->counter_flow_spare_sync = StatsRegisterCounter("flow.wrk.spare_sync", tv);
Expand Down
1 change: 1 addition & 0 deletions src/decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ typedef struct DecodeThreadVars_
uint16_t counter_flow_get_used_eval_reject;
uint16_t counter_flow_get_used_eval_busy;
uint16_t counter_flow_get_used_failed;
uint16_t counter_flow_elephant;

uint16_t counter_flow_spare_sync;
uint16_t counter_flow_spare_sync_empty;
Expand Down
30 changes: 30 additions & 0 deletions src/flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ int FlowChangeProto(Flow *f)
return 0;
}

static void FlowIsElephant(Flow *f, ThreadVars *tv, DecodeThreadVars *dtv)
{
if (f->elephant)
return; /* Flow is a known elephant flow, no need to calculate again */
uint32_t age = SCTIME_SECS(f->lastts) - SCTIME_SECS(f->startts);
if (age == 0)
return;
uint64_t rate_byte_influx = (f->tosrcbytecnt + f->todstbytecnt) / age;
if (rate_byte_influx >= flow_config.elephant_flow_rate) {
f->elephant = true;
StatsIncr(tv, dtv->counter_flow_elephant);
/* Let Flow Manager take care of this elephant flow */
}

/* Nothing should be reset as an elephant flow should not be considered a normal flow */
}

static inline void FlowSwapFlags(Flow *f)
{
SWAP_FLAGS(f->flags, FLOW_TO_SRC_SEEN, FLOW_TO_DST_SEEN);
Expand Down Expand Up @@ -422,6 +439,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
if (pkt_dir == TOSERVER) {
f->todstpktcnt++;
f->todstbytecnt += GET_PKT_LEN(p);
FlowIsElephant(f, tv, dtv);
p->flowflags = FLOW_PKT_TOSERVER;
if (!(f->flags & FLOW_TO_DST_SEEN)) {
if (FlowUpdateSeenFlag(p)) {
Expand All @@ -446,6 +464,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
} else {
f->tosrcpktcnt++;
f->tosrcbytecnt += GET_PKT_LEN(p);
FlowIsElephant(f, tv, dtv);
p->flowflags = FLOW_PKT_TOCLIENT;
if (!(f->flags & FLOW_TO_SRC_SEEN)) {
if (FlowUpdateSeenFlag(p)) {
Expand Down Expand Up @@ -546,6 +565,7 @@ void FlowInitConfig(bool quiet)
flow_config.hash_size = FLOW_DEFAULT_HASHSIZE;
flow_config.prealloc = FLOW_DEFAULT_PREALLOC;
SC_ATOMIC_SET(flow_config.memcap, FLOW_DEFAULT_MEMCAP);
flow_config.elephant_flow_rate = UINT32_MAX;

/* If we have specific config, overwrite the defaults with them,
* otherwise, leave the default values */
Expand Down Expand Up @@ -609,6 +629,16 @@ void FlowInitConfig(bool quiet)
}
}

if ((ConfGet("elephant-flow.rate", &conf_val)) == 1) {
if (conf_val == NULL) {
FatalError("Invalid value for elephant_flow: NULL");
}

if (StringParseUint32(&configval, 10, strlen(conf_val), conf_val) > 0) {
flow_config.elephant_flow_rate = configval;
}
}

flow_config.memcap_policy = ExceptionPolicyParse("flow.memcap-policy", false);

SCLogDebug("Flow config from suricata.yaml: memcap: %"PRIu64", hash-size: "
Expand Down
31 changes: 17 additions & 14 deletions src/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ typedef struct FlowCnf_
uint32_t timeout_new;
uint32_t timeout_est;

uint32_t elephant_flow_rate;

uint32_t emergency_recovery;

enum ExceptionPolicy memcap_policy;
Expand Down Expand Up @@ -416,6 +418,11 @@ typedef struct Flow_

FlowStateType flow_state;

bool elephant;

uint8_t flow_end_flags;
/* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */

/** flow tenant id, used to setup flow timeout and stream pseudo
* packets with the correct tenant id set */
uint32_t tenant_id;
Expand All @@ -442,16 +449,6 @@ typedef struct Flow_
#error Enable FLOWLOCK_RWLOCK or FLOWLOCK_MUTEX
#endif

/** protocol specific data pointer, e.g. for TcpSession */
void *protoctx;

/** mapping to Flow's protocol specific protocols for timeouts
and state and free functions. */
uint8_t protomap;

uint8_t flow_end_flags;
/* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */

AppProto alproto; /**< \brief application level protocol */
AppProto alproto_ts;
AppProto alproto_tc;
Expand All @@ -463,14 +460,21 @@ typedef struct Flow_
* STARTTLS. */
AppProto alproto_expect;

uint8_t min_ttl_toserver;
uint8_t max_ttl_toserver;

/** detection engine ctx version used to inspect this flow. Set at initial
* inspection. If it doesn't match the currently in use de_ctx, the
* stored sgh ptrs are reset. */
uint32_t de_ctx_version;

/** ttl tracking */
uint8_t min_ttl_toserver;
uint8_t max_ttl_toserver;
/** protocol specific data pointer, e.g. for TcpSession */
void *protoctx;

/** mapping to Flow's protocol specific protocols for timeouts
and state and free functions. */
uint8_t protomap;

uint8_t min_ttl_toclient;
uint8_t max_ttl_toclient;

Expand Down Expand Up @@ -498,7 +502,6 @@ typedef struct Flow_
uint32_t tosrcpktcnt;
uint64_t todstbytecnt;
uint64_t tosrcbytecnt;

Storage storage[];
} Flow;

Expand Down
7 changes: 7 additions & 0 deletions suricata.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,13 @@ flow:
#managers: 1 # default to one flow manager
#recyclers: 1 # default to one flow recycler thread

# Mark a flow as an elephant flow if the rate of influx of bytes is equal to
# or exceeds the given number. The rate is calculated by dividing the total
# number of bytes a flow has seen in either direction divided by the time a
# flow has been alive.
elephant-flow:
rate: 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me a rate needs a time element, how would that be expressed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. will make it take two args instead: bytes and seconds just like the flow.rate keyword


# This option controls the use of VLAN ids in the flow (and defrag)
# hashing. Normally this should be enabled, but in some (broken)
# setups where both sides of a flow are not tagged with the same VLAN
Expand Down
Loading