Skip to content

Commit

Permalink
Reduce ASN related allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Aug 1, 2024
1 parent 50d91e4 commit 1a474eb
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 183 deletions.
9 changes: 6 additions & 3 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod packet_router;
mod sessions;

use super::RunArgs;
use crate::{net::maxmind_db::IpNetEntry, pool::PoolBuffer};
use crate::pool::PoolBuffer;
pub use sessions::SessionPool;
use std::{
net::SocketAddr,
Expand Down Expand Up @@ -129,8 +129,11 @@ impl Proxy {
let id = config.id.load();
let num_workers = self.num_workers.get();

let (upstream_sender, upstream_receiver) =
async_channel::bounded::<(PoolBuffer, Option<IpNetEntry>, SocketAddr)>(250);
let (upstream_sender, upstream_receiver) = async_channel::bounded::<(
PoolBuffer,
Option<crate::net::maxmind_db::MetricsIpNetEntry>,
SocketAddr,
)>(250);
let buffer_pool = Arc::new(crate::pool::BufferPool::new(num_workers, 64 * 1024));
let sessions = SessionPool::new(
config.clone(),
Expand Down
34 changes: 17 additions & 17 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{
};
use crate::{
filters::{Filter as _, ReadContext},
metrics,
pool::PoolBuffer,
time::UtcTimestamp,
Config,
Expand Down Expand Up @@ -69,35 +70,35 @@ impl DownstreamReceiveWorkerConfig {
match result {
Err(error) => {
tracing::trace!(%error, "error receiving packet");
crate::metrics::errors_total(
crate::metrics::WRITE,
metrics::errors_total(
metrics::WRITE,
&error.to_string(),
None,
&metrics::EMPTY,
)
.inc();
}
Ok((data, asn_info, send_addr)) => {
let (result, _) = send_socket.send_to(data, send_addr).await;
let asn_info = asn_info.as_ref();
let asn_info = asn_info.as_ref().into();
match result {
Ok(size) => {
crate::metrics::packets_total(crate::metrics::WRITE, asn_info)
metrics::packets_total(metrics::WRITE, &asn_info)
.inc();
crate::metrics::bytes_total(crate::metrics::WRITE, asn_info)
metrics::bytes_total(metrics::WRITE, &asn_info)
.inc_by(size as u64);
}
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(
crate::metrics::WRITE,
metrics::errors_total(
metrics::WRITE,
&source,
asn_info,
&asn_info,
)
.inc();
crate::metrics::packets_dropped_total(
crate::metrics::WRITE,
metrics::packets_dropped_total(
metrics::WRITE,
&source,
asn_info,
&asn_info,
)
.inc();
}
Expand Down Expand Up @@ -134,7 +135,7 @@ impl DownstreamReceiveWorkerConfig {
};

if let Some(last_received_at) = last_received_at {
crate::metrics::packet_jitter(crate::metrics::READ, None)
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((packet.received_at - last_received_at).nanos());
}
last_received_at = Some(packet.received_at);
Expand Down Expand Up @@ -178,14 +179,13 @@ impl DownstreamReceiveWorkerConfig {
"received packet from downstream"
);

let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();
let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions).await {
Ok(()) => {}
Err(error) => {
let discriminant = PipelineErrorDiscriminants::from(&error).to_string();
crate::metrics::errors_total(crate::metrics::READ, &discriminant, None).inc();
crate::metrics::packets_dropped_total(crate::metrics::READ, &discriminant, None)
.inc();
metrics::errors_total(metrics::READ, &discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, &discriminant, &metrics::EMPTY).inc();
let _ = error_sender.send(error);
}
}
Expand Down
Loading

0 comments on commit 1a474eb

Please sign in to comment.