Skip to content

Commit

Permalink
fix: Agent config change TODOs vol.2
Browse files Browse the repository at this point in the history
  • Loading branch information
rvql committed Dec 3, 2024
1 parent 04c0b71 commit 0abb09b
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 198 deletions.
26 changes: 13 additions & 13 deletions agent/src/collector/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl Stash {
}
}

if !acc_flow.is_active_host0 && !acc_flow.is_active_host1 && !config.inactive_ip_enabled {
if !acc_flow.is_active_host0 && !acc_flow.is_active_host1 && config.inactive_ip_aggregation {
self.counter.drop_inactive.fetch_add(1, Ordering::Relaxed);
return;
}
Expand Down Expand Up @@ -518,7 +518,7 @@ impl Stash {
acc_flow.is_active_host1
};
// single_stats: Do not count the inactive end (Internet/private network IP with no response packet)
if config.inactive_ip_enabled || is_active_host {
if !config.inactive_ip_aggregation || is_active_host {
let flow_meter = if ep == FLOW_METRICS_PEER_DST {
acc_flow.flow_meter.to_reversed()
} else {
Expand Down Expand Up @@ -630,7 +630,7 @@ impl Stash {
if m.flow.close_type != CloseType::Unknown
&& m.flow.close_type != CloseType::ForcedReport
{
if !m.is_active_host0 && !m.is_active_host1 && !config.inactive_ip_enabled {
if !m.is_active_host0 && !m.is_active_host1 && config.inactive_ip_aggregation {
self.counter.drop_inactive.fetch_add(1, Ordering::Relaxed);
return;
}
Expand Down Expand Up @@ -685,7 +685,7 @@ impl Stash {
None => return,
};

if !meter.is_active_host0 && !meter.is_active_host1 && !config.inactive_ip_enabled {
if !meter.is_active_host0 && !meter.is_active_host1 && config.inactive_ip_aggregation {
self.counter.drop_inactive.fetch_add(1, Ordering::Relaxed);
return;
}
Expand Down Expand Up @@ -713,7 +713,7 @@ impl Stash {
meter.is_active_host1
};
// single_stats: Do not count the inactive end (Internet/private network IP with no response packet)
if config.inactive_ip_enabled || is_active_host {
if !config.inactive_ip_aggregation || is_active_host {
let mut tagger = get_single_tagger(
self.global_thread_id,
&flow,
Expand Down Expand Up @@ -921,11 +921,11 @@ impl Stash {
}
}

// server_port is ignored when is_active_service and inactive_server_port_enabled is turned off
// is_active_service and SFlow,NetFlow data, ignoring service port
// server_port is ignored when service is not active and inactive_server_port_aggregation is turned on
// is_active_service and SFlow, NetFlow data, ignoring service port
// ignore the server for non-TCP/UDP traffic
fn ignore_server_port(flow: &MiniFlow, inactive_server_port_enabled: bool) -> bool {
(!flow.is_active_service && !inactive_server_port_enabled)
fn ignore_server_port(flow: &MiniFlow, inactive_server_port_aggregation: bool) -> bool {
(!flow.is_active_service && inactive_server_port_aggregation)
|| (flow.flow_key.proto != IpProtocol::TCP && flow.flow_key.proto != IpProtocol::UDP)
}

Expand Down Expand Up @@ -957,7 +957,7 @@ fn get_single_tagger(
}
}
RunningMode::Managed => {
if !config.inactive_ip_enabled {
if config.inactive_ip_aggregation {
if !is_active_host {
unspecified_ip(is_ipv6)
} else {
Expand Down Expand Up @@ -997,7 +997,7 @@ fn get_single_tagger(
tap_type: flow_key.tap_type,
// If the resource is located on the client, the service port is ignored
server_port: if ep == FLOW_METRICS_PEER_SRC
|| ignore_server_port(flow, config.inactive_server_port_enabled)
|| ignore_server_port(flow, config.inactive_server_port_aggregation)
{
0
} else {
Expand Down Expand Up @@ -1055,7 +1055,7 @@ fn get_edge_tagger(
RunningMode::Standalone => (flow.peers[0].nat_real_ip, flow.peers[1].nat_real_ip),
RunningMode::Managed => {
let (mut src_ip, mut dst_ip) = (flow.peers[0].nat_real_ip, flow.peers[1].nat_real_ip);
if !config.inactive_ip_enabled {
if config.inactive_ip_aggregation {
if !is_active_host0 {
src_ip = unspecified_ip(is_ipv6);
}
Expand Down Expand Up @@ -1110,7 +1110,7 @@ fn get_edge_tagger(
tap_side: TapSide::from(direction),
tap_port: flow_key.tap_port,
tap_type: flow_key.tap_type,
server_port: if ignore_server_port(flow, config.inactive_server_port_enabled) {
server_port: if ignore_server_port(flow, config.inactive_server_port_aggregation) {
0
} else {
dst_ep.nat_real_port
Expand Down
18 changes: 4 additions & 14 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,6 @@ where
pub struct TcpHeader {
pub block_size: usize,
pub sender_queue_size: usize,
pub sender_queue_count: usize,
#[serde(deserialize_with = "parse_maybe_binary_u8")]
pub header_fields_flag: u8,
}
Expand All @@ -1431,7 +1430,6 @@ impl Default for TcpHeader {
Self {
block_size: 256,
sender_queue_size: 65536,
sender_queue_count: 1,
header_fields_flag: 0b0000_0000,
}
}
Expand Down Expand Up @@ -1636,17 +1634,17 @@ impl Default for Timeouts {
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct TracingTag {
pub http_real_client: String,
pub x_request_id: String,
pub http_real_client: Vec<String>,
pub x_request_id: Vec<String>,
pub apm_trace_id: Vec<String>,
pub apm_span_id: Vec<String>,
}

impl Default for TracingTag {
fn default() -> Self {
Self {
http_real_client: "X_Forwarded_For".to_string(),
x_request_id: "X_Request_ID".to_string(),
http_real_client: vec!["X_Forwarded_For".to_string()],
x_request_id: vec!["X_Request_ID".to_string()],
apm_trace_id: vec!["traceparent".to_string(), "sw8".to_string()],
apm_span_id: vec!["traceparent".to_string(), "sw8".to_string()],
}
Expand Down Expand Up @@ -2279,14 +2277,12 @@ impl Default for Throttles {
#[serde(default)]
pub struct OutputsFlowLogTunning {
pub collector_queue_size: usize,
pub collector_queue_count: usize,
}

impl Default for OutputsFlowLogTunning {
fn default() -> Self {
Self {
collector_queue_size: 65536,
collector_queue_count: 1,
}
}
}
Expand Down Expand Up @@ -2325,14 +2321,12 @@ impl Default for FlowMetricsFilters {
#[serde(default)]
pub struct FlowMetricsTunning {
pub sender_queue_size: usize,
pub sender_queue_count: usize,
}

impl Default for FlowMetricsTunning {
fn default() -> Self {
Self {
sender_queue_size: 65536,
sender_queue_count: 1,
}
}
}
Expand Down Expand Up @@ -3247,21 +3241,18 @@ log_backhaul_enabled: false
let yaml = r#"
block_size: 512
sender_queue_size: 131072
sender_queue_count: 2
header_fields_flag: "0b1010_1010"
"#;
let tcp_header: TcpHeader = serde_yaml::from_str(yaml).unwrap();

assert_eq!(tcp_header.block_size, 512);
assert_eq!(tcp_header.sender_queue_size, 131072);
assert_eq!(tcp_header.sender_queue_count, 2);
assert_eq!(tcp_header.header_fields_flag, 0b1010_1010);

// Test with decimal input for header_fields_flag
let yaml = r#"
block_size: 256
sender_queue_size: 65536
sender_queue_count: 1
header_fields_flag: "170"
"#;
let tcp_header: TcpHeader = serde_yaml::from_str(yaml).unwrap();
Expand All @@ -3272,7 +3263,6 @@ header_fields_flag: "170"
let yaml_invalid = r#"
block_size: 256
sender_queue_size: 65536
sender_queue_count: 1
header_fields_flag: "invalid"
"#;
let result: Result<TcpHeader, _> = serde_yaml::from_str(yaml_invalid);
Expand Down
64 changes: 22 additions & 42 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ pub type PortAccess = Access<PortConfig>;
#[derive(Clone, PartialEq, Eq)]
pub struct CollectorConfig {
pub enabled: bool,
pub inactive_server_port_enabled: bool,
pub inactive_ip_enabled: bool,
pub inactive_server_port_aggregation: bool,
pub inactive_ip_aggregation: bool,
pub vtap_flow_1s_enabled: bool,
pub l4_log_collect_nps_threshold: u64,
pub l4_log_store_tap_types: [bool; 256],
Expand All @@ -147,10 +147,10 @@ impl fmt::Debug for CollectorConfig {
f.debug_struct("CollectorConfig")
.field("enabled", &self.enabled)
.field(
"inactive_server_port_enabled",
&self.inactive_server_port_enabled,
"inactive_server_port_aggregation",
&self.inactive_server_port_aggregation,
)
.field("inactive_ip_enabled", &self.inactive_ip_enabled)
.field("inactive_ip_aggregation", &self.inactive_ip_aggregation)
.field("vtap_flow_1s_enabled", &self.vtap_flow_1s_enabled)
.field(
"l4_log_store_tap_types",
Expand Down Expand Up @@ -207,7 +207,6 @@ pub struct EnvironmentConfig {

#[derive(Clone, PartialEq, Eq, Debug)]
pub struct SenderConfig {
pub mtu: u32,
pub dest_ip: String,
pub agent_id: u16,
pub team_id: u32,
Expand Down Expand Up @@ -1370,7 +1369,7 @@ impl Default for TraceType {
#[derive(Default, Clone)]
pub struct L7LogDynamicConfig {
// in lowercase
pub proxy_client: String,
pub proxy_client: HashSet<String>,
// in lowercase
pub x_request_id: HashSet<String>,

Expand Down Expand Up @@ -1419,16 +1418,20 @@ impl Eq for L7LogDynamicConfig {}

impl L7LogDynamicConfig {
pub fn new(
mut proxy_client: String,
proxy_client: Vec<String>,
x_request_id: Vec<String>,
trace_types: Vec<TraceType>,
span_types: Vec<TraceType>,
mut extra_log_fields: ExtraLogFields,
) -> Self {
proxy_client.make_ascii_lowercase();

let mut expected_headers_set = get_expected_headers();
expected_headers_set.insert(proxy_client.as_bytes().to_vec());

let mut proxy_client_set = HashSet::new();
for client in proxy_client.iter() {
let client = client.trim();
expected_headers_set.insert(client.as_bytes().to_vec());
proxy_client_set.insert(client.to_string());
}
let mut x_request_id_set = HashSet::new();
for t in x_request_id.iter() {
let t = t.trim();
Expand Down Expand Up @@ -1457,7 +1460,7 @@ impl L7LogDynamicConfig {
}

Self {
proxy_client,
proxy_client: proxy_client_set,
x_request_id: x_request_id_set,
trace_types,
span_types,
Expand Down Expand Up @@ -1650,7 +1653,6 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
cpu_set: CpuSet::new(),
},
sender: SenderConfig {
mtu: conf.outputs.npb.max_mtu,
dest_ip: dest_ip.clone(),
agent_id: conf.global.common.agent_id as u16,
team_id: conf.global.common.team_id,
Expand Down Expand Up @@ -1695,12 +1697,12 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
},
collector: CollectorConfig {
enabled: conf.outputs.flow_metrics.enabled,
inactive_server_port_enabled: conf
inactive_server_port_aggregation: conf
.outputs
.flow_metrics
.filters
.inactive_server_port_aggregation,
inactive_ip_enabled: conf.outputs.flow_metrics.filters.inactive_ip_aggregation,
inactive_ip_aggregation: conf.outputs.flow_metrics.filters.inactive_ip_aggregation,
vtap_flow_1s_enabled: conf.outputs.flow_metrics.filters.second_metrics,
l4_log_collect_nps_threshold: conf.outputs.flow_log.throttles.l4_throttle,
l7_metrics_enabled: conf.outputs.flow_metrics.filters.apm_metrics,
Expand Down Expand Up @@ -1833,14 +1835,16 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
.tag_extraction
.tracing_tag
.http_real_client
.to_ascii_lowercase(),
.iter()
.map(|x| x.to_ascii_lowercase())
.collect(),
conf.processors
.request_log
.tag_extraction
.tracing_tag
.x_request_id
.split(',')
.map(|x| x.to_lowercase())
.iter()
.map(|x| x.to_ascii_lowercase())
.collect(),
conf.processors
.request_log
Expand Down Expand Up @@ -3992,14 +3996,6 @@ impl ConfigHandler {

let tunning = &mut flow_log.tunning;
let new_tunning = &mut new_flow_log.tunning;
if tunning.collector_queue_count != new_tunning.collector_queue_count {
info!(
"Update outputs.flow_log.tunning.collector_queue_count from {:?} to {:?}.",
tunning.collector_queue_count, new_tunning.collector_queue_count
);
tunning.collector_queue_count = new_tunning.collector_queue_count;
restart_agent = !first_run;
}
if tunning.collector_queue_size != new_tunning.collector_queue_size {
info!(
"Update outputs.flow_log.tunning.collector_queue_size from {:?} to {:?}.",
Expand Down Expand Up @@ -4056,14 +4052,6 @@ impl ConfigHandler {
}
let tunning = &mut outputs.flow_metrics.tunning;
let new_tunning = &mut new_outputs.flow_metrics.tunning;
if tunning.sender_queue_count != new_tunning.sender_queue_count {
info!(
"Update outputs.flow_metrics.tunning.sender_queue_count from {:?} to {:?}.",
tunning.sender_queue_count, new_tunning.sender_queue_count
);
tunning.sender_queue_count = new_tunning.sender_queue_count;
restart_agent = !first_run;
}
if tunning.sender_queue_size != new_tunning.sender_queue_size {
info!(
"Update outputs.flow_metrics.tunning.sender_queue_size from {:?} to {:?}.",
Expand Down Expand Up @@ -4258,14 +4246,6 @@ impl ConfigHandler {
tcp_header.header_fields_flag = new_tcp_header.header_fields_flag;
restart_agent = !first_run;
}
if tcp_header.sender_queue_count != new_tcp_header.sender_queue_count {
info!(
"Update processors.packet.tcp_header.sender_queue_count from {:?} to {:?}.",
tcp_header.sender_queue_count, new_tcp_header.sender_queue_count
);
tcp_header.sender_queue_count = new_tcp_header.sender_queue_count;
restart_agent = !first_run;
}
if tcp_header.sender_queue_size != new_tcp_header.sender_queue_size {
info!(
"Update processors.packet.tcp_header.sender_queue_size from {:?} to {:?}.",
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ impl HttpLog {
info.x_request_id_1 = val.to_owned();
}
}
if direction == PacketDirection::ClientToServer && key == &config.proxy_client {
if direction == PacketDirection::ClientToServer && config.proxy_client.contains(key) {
info.client_ip = Some(val.to_owned());
}

Expand Down
Loading

0 comments on commit 0abb09b

Please sign in to comment.