From de9d981fcca5accad1ca727518e93a946cfc6347 Mon Sep 17 00:00:00 2001 From: adgaultier <105724249+adgaultier@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:01:35 +0200 Subject: [PATCH] Use Map for traffic direction (#28) Co-authored-by: Badr --- oryx-ebpf/src/main.rs | 118 ++++--- oryx-tui/src/app.rs | 22 +- oryx-tui/src/ebpf.rs | 545 +---------------------------- oryx-tui/src/ebpf/egress.rs | 235 +++++++++++++ oryx-tui/src/ebpf/firewall.rs | 142 ++++++++ oryx-tui/src/ebpf/ingress.rs | 239 +++++++++++++ oryx-tui/src/filter.rs | 246 +++++-------- oryx-tui/src/filter/direction.rs | 8 - oryx-tui/src/handler.rs | 26 +- oryx-tui/src/section.rs | 6 +- oryx-tui/src/section/firewall.rs | 62 +--- oryx-tui/src/section/inspection.rs | 2 +- 12 files changed, 816 insertions(+), 835 deletions(-) create mode 100644 oryx-tui/src/ebpf/egress.rs create mode 100644 oryx-tui/src/ebpf/firewall.rs create mode 100644 oryx-tui/src/ebpf/ingress.rs diff --git a/oryx-ebpf/src/main.rs b/oryx-ebpf/src/main.rs index 40aa3c2..051681d 100644 --- a/oryx-ebpf/src/main.rs +++ b/oryx-ebpf/src/main.rs @@ -33,6 +33,9 @@ static TRANSPORT_FILTERS: Array = Array::with_max_entries(8, 0); #[map] static LINK_FILTERS: Array = Array::with_max_entries(8, 0); +#[map] +static TRAFFIC_DIRECTION_FILTER: Array = Array::with_max_entries(1, 0); + #[map] static BLOCKLIST_IPV6: HashMap = HashMap::::with_max_entries(MAX_FIREWALL_RULES, 0); @@ -41,6 +44,9 @@ static BLOCKLIST_IPV6: HashMap = static BLOCKLIST_IPV4: HashMap = HashMap::::with_max_entries(MAX_FIREWALL_RULES, 0); +#[no_mangle] +static TRAFFIC_DIRECTION: i32 = 0; + #[classifier] pub fn oryx(ctx: TcContext) -> i32 { match process(ctx) { @@ -56,6 +62,7 @@ fn submit(packet: RawPacket) { buf.submit(0); } } + #[inline] fn ptr_at(ctx: &TcContext, offset: usize) -> Result<*const T, ()> { let start = ctx.data(); @@ -70,12 +77,23 @@ fn ptr_at(ctx: &TcContext, offset: usize) -> Result<*const T, ()> { } #[inline] -fn filter_for_ipv4_address( - addr: u32, - port: u16, - blocked_ports_map: &HashMap, -) -> bool { - if let Some(blocked_ports) = unsafe { blocked_ports_map.get(&addr) } { +fn filter_direction() -> bool { + // 0(default) -> false(send to tui), 1 -> true(filter) + if let Some(v) = TRAFFIC_DIRECTION_FILTER.get(0) { + return *v != 0; + } + false +} + +#[inline] +fn is_ingress() -> bool { + let traffic_direction = unsafe { core::ptr::read_volatile(&TRAFFIC_DIRECTION) }; + traffic_direction == -1 +} + +#[inline] +fn block_ipv4(addr: u32, port: u16) -> bool { + if let Some(blocked_ports) = unsafe { BLOCKLIST_IPV4.get(&addr) } { for (idx, blocked_port) in blocked_ports.iter().enumerate() { if *blocked_port == 0 { if idx == 0 { @@ -92,12 +110,8 @@ fn filter_for_ipv4_address( } #[inline] -fn filter_for_ipv6_address( - addr: u128, - port: u16, - blocked_ports_map: &HashMap, -) -> bool { - if let Some(blocked_ports) = unsafe { blocked_ports_map.get(&addr) } { +fn block_ipv6(addr: u128, port: u16) -> bool { + if let Some(blocked_ports) = unsafe { BLOCKLIST_IPV6.get(&addr) } { for (idx, blocked_port) in blocked_ports.iter().enumerate() { if *blocked_port == 0 { if idx == 0 { @@ -142,26 +156,33 @@ fn process(ctx: TcContext) -> Result { match ethhdr.ether_type { EtherType::Ipv4 => { let header: Ipv4Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; - let src_addr = u32::from_be(header.src_addr); - let dst_addr = u32::from_be(header.dst_addr); + + let addr = if is_ingress() { + u32::from_be(header.src_addr) + } else { + u32::from_be(header.dst_addr) + }; match header.proto { IpProto::Tcp => { let tcphdr: *const TcpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let src_port = u16::from_be(unsafe { (*tcphdr).source }); - let dst_port = u16::from_be(unsafe { (*tcphdr).dest }); + let port = if is_ingress() { + u16::from_be(unsafe { (*tcphdr).source }) + } else { + u16::from_be(unsafe { (*tcphdr).dest }) + }; - if filter_for_ipv4_address(src_addr, src_port, &BLOCKLIST_IPV4) - || filter_for_ipv4_address(dst_addr, dst_port, &BLOCKLIST_IPV4) - { - return Ok(TC_ACT_SHOT); + if block_ipv4(addr, port) { + return Ok(TC_ACT_SHOT); //block packet } if filter_packet(Protocol::Network(NetworkProtocol::Ipv4)) || filter_packet(Protocol::Transport(TransportProtocol::TCP)) + || filter_direction() { - return Ok(TC_ACT_PIPE); //DONT FWD PACKET TO TUI + return Ok(TC_ACT_PIPE); } + submit(RawPacket::Ip( IpHdr::V4(header), ProtoHdr::Tcp(unsafe { *tcphdr }), @@ -169,17 +190,19 @@ fn process(ctx: TcContext) -> Result { } IpProto::Udp => { let udphdr: *const UdpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let src_port = u16::from_be(unsafe { (*udphdr).source }); - let dst_port = u16::from_be(unsafe { (*udphdr).dest }); + let port = if is_ingress() { + u16::from_be(unsafe { (*udphdr).source }) + } else { + u16::from_be(unsafe { (*udphdr).dest }) + }; - if filter_for_ipv4_address(src_addr, src_port, &BLOCKLIST_IPV4) - || filter_for_ipv4_address(dst_addr, dst_port, &BLOCKLIST_IPV4) - { - return Ok(TC_ACT_SHOT); + if block_ipv4(addr, port) { + return Ok(TC_ACT_SHOT); //block packet } if filter_packet(Protocol::Network(NetworkProtocol::Ipv4)) || filter_packet(Protocol::Transport(TransportProtocol::UDP)) + || filter_direction() { return Ok(TC_ACT_PIPE); } @@ -204,24 +227,30 @@ fn process(ctx: TcContext) -> Result { } EtherType::Ipv6 => { let header: Ipv6Hdr = ctx.load(EthHdr::LEN).map_err(|_| ())?; - let src_addr = header.src_addr().to_bits(); - let dst_addr = header.dst_addr().to_bits(); + let addr = if is_ingress() { + header.src_addr().to_bits() + } else { + header.dst_addr().to_bits() + }; match header.next_hdr { IpProto::Tcp => { let tcphdr: *const TcpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv6Hdr::LEN)?; - let src_port = u16::from_be(unsafe { (*tcphdr).source }); - let dst_port = u16::from_be(unsafe { (*tcphdr).dest }); + let port = if is_ingress() { + u16::from_be(unsafe { (*tcphdr).source }) + } else { + u16::from_be(unsafe { (*tcphdr).dest }) + }; - if filter_for_ipv6_address(src_addr, src_port, &BLOCKLIST_IPV6) - || filter_for_ipv6_address(dst_addr, dst_port, &BLOCKLIST_IPV6) - { - return Ok(TC_ACT_SHOT); + if block_ipv6(addr, port) { + return Ok(TC_ACT_SHOT); //block packet } + if filter_packet(Protocol::Network(NetworkProtocol::Ipv6)) || filter_packet(Protocol::Transport(TransportProtocol::TCP)) + || filter_direction() { - return Ok(TC_ACT_PIPE); //DONT FWD PACKET TO TUI + return Ok(TC_ACT_PIPE); } submit(RawPacket::Ip( IpHdr::V6(header), @@ -230,18 +259,21 @@ fn process(ctx: TcContext) -> Result { } IpProto::Udp => { let udphdr: *const UdpHdr = ptr_at(&ctx, EthHdr::LEN + Ipv6Hdr::LEN)?; - let src_port = u16::from_be(unsafe { (*udphdr).source }); - let dst_port = u16::from_be(unsafe { (*udphdr).dest }); + let port = if is_ingress() { + u16::from_be(unsafe { (*udphdr).source }) + } else { + u16::from_be(unsafe { (*udphdr).dest }) + }; - if filter_for_ipv6_address(src_addr, src_port, &BLOCKLIST_IPV6) - || filter_for_ipv6_address(dst_addr, dst_port, &BLOCKLIST_IPV6) - { - return Ok(TC_ACT_SHOT); + if block_ipv6(addr, port) { + return Ok(TC_ACT_SHOT); //block packet } + if filter_packet(Protocol::Network(NetworkProtocol::Ipv6)) || filter_packet(Protocol::Transport(TransportProtocol::UDP)) + || filter_direction() { - return Ok(TC_ACT_PIPE); //DONT FWD PACKET TO TUI + return Ok(TC_ACT_PIPE); } submit(RawPacket::Ip( IpHdr::V6(header), diff --git a/oryx-tui/src/app.rs b/oryx-tui/src/app.rs index d160af4..c1c528b 100644 --- a/oryx-tui/src/app.rs +++ b/oryx-tui/src/app.rs @@ -7,10 +7,11 @@ use std::{ error, sync::{Arc, Mutex}, thread, + time::Duration, }; -use crate::notification::Notification; use crate::{filter::Filter, help::Help}; +use crate::{filter::IoChannels, notification::Notification}; use crate::{packet::AppPacket, section::Section}; pub type AppResult = std::result::Result>; @@ -57,9 +58,7 @@ impl App { let (sender, receiver) = kanal::unbounded(); - let (firewall_ingress_sender, firewall_ingress_receiver) = kanal::unbounded(); - let (firewall_egress_sender, firewall_egress_receiver) = kanal::unbounded(); - + let firewall_channels = IoChannels::new(); thread::spawn({ let packets = packets.clone(); move || loop { @@ -77,20 +76,11 @@ impl App { Self { running: true, help: Help::new(), - filter: Filter::new( - firewall_ingress_sender.clone(), - firewall_ingress_receiver, - firewall_egress_sender.clone(), - firewall_egress_receiver, - ), + filter: Filter::new(firewall_channels.clone()), start_sniffing: false, packets: packets.clone(), notifications: Vec::new(), - section: Section::new( - packets.clone(), - firewall_ingress_sender, - firewall_egress_sender, - ), + section: Section::new(packets.clone(), firewall_channels.clone()), data_channel_sender: sender, is_editing: false, active_popup: None, @@ -133,6 +123,8 @@ impl App { } pub fn quit(&mut self) { + self.filter.terminate(); + thread::sleep(Duration::from_millis(110)); self.running = false; } } diff --git a/oryx-tui/src/ebpf.rs b/oryx-tui/src/ebpf.rs index 54d53ad..2d3164d 100644 --- a/oryx-tui/src/ebpf.rs +++ b/oryx-tui/src/ebpf.rs @@ -1,30 +1,18 @@ -use std::{ - io, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, - os::fd::AsRawFd, - sync::{atomic::AtomicBool, Arc}, - thread, - time::Duration, -}; +pub mod egress; +mod firewall; +pub mod ingress; + +use std::{io, os::fd::AsRawFd}; use aya::{ - include_bytes_aligned, - maps::{ring_buf::RingBufItem, Array, HashMap, MapData, RingBuf}, - programs::{tc, SchedClassifier, TcAttachType}, + maps::{ring_buf::RingBufItem, MapData, RingBuf}, Ebpf, }; -use oryx_common::{protocols::Protocol, RawPacket, MAX_RULES_PORT}; -use crate::{ - event::Event, - filter::FilterChannelSignal, - notification::{Notification, NotificationLevel}, - section::firewall::{BlockedPort, FirewallSignal}, -}; -use mio::{event::Source, unix::SourceFd, Events, Interest, Poll, Registry, Token}; +use mio::{event::Source, unix::SourceFd, Interest, Registry, Token}; pub struct RingBuffer<'a> { - buffer: RingBuf<&'a mut MapData>, + pub buffer: RingBuf<&'a mut MapData>, } impl<'a> RingBuffer<'a> { @@ -61,518 +49,7 @@ impl Source for RingBuffer<'_> { SourceFd(&self.buffer.as_raw_fd()).deregister(registry) } } - -fn update_ipv4_blocklist( - ipv4_firewall: &mut HashMap, - addr: Ipv4Addr, - port: BlockedPort, - to_insert: bool, -) { - if let Ok(mut blocked_ports) = ipv4_firewall.get(&addr.to_bits(), 0) { - match port { - BlockedPort::Single(port) => { - if to_insert { - if let Some((first_zero_index, _)) = blocked_ports - .iter() - .enumerate() - .find(|(_, &value)| value == 0) - { - blocked_ports[first_zero_index] = port; - ipv4_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } else { - unreachable!(); - } - } else { - let not_null_ports = blocked_ports - .into_iter() - .filter(|p| (*p != 0 && *p != port)) - .collect::>(); - - let mut blocked_ports = [0; MAX_RULES_PORT]; - - for (idx, p) in not_null_ports.iter().enumerate() { - blocked_ports[idx] = *p; - } - - if blocked_ports.iter().all(|&port| port == 0) { - ipv4_firewall.remove(&addr.to_bits()).unwrap(); - } else { - ipv4_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } - } - } - BlockedPort::All => { - if to_insert { - ipv4_firewall - .insert(addr.to_bits(), [0; MAX_RULES_PORT], 0) - .unwrap(); - } else { - ipv4_firewall.remove(&addr.to_bits()).unwrap(); - } - } - } - } else if to_insert { - let mut blocked_ports: [u16; MAX_RULES_PORT] = [0; MAX_RULES_PORT]; - match port { - BlockedPort::Single(port) => { - blocked_ports[0] = port; - } - BlockedPort::All => {} - } - - ipv4_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } -} - -fn update_ipv6_blocklist( - ipv6_firewall: &mut HashMap, - addr: Ipv6Addr, - port: BlockedPort, - to_insert: bool, -) { - if let Ok(mut blocked_ports) = ipv6_firewall.get(&addr.to_bits(), 0) { - match port { - BlockedPort::Single(port) => { - if to_insert { - if let Some((first_zero_index, _)) = blocked_ports - .iter() - .enumerate() - .find(|(_, &value)| value == 0) - { - blocked_ports[first_zero_index] = port; - ipv6_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } else { - //TODO: - unreachable!(); // list is full - } - } else { - let not_null_ports = blocked_ports - .into_iter() - .filter(|p| (*p != 0 && *p != port)) - .collect::>(); - - let mut blocked_ports = [0; MAX_RULES_PORT]; - - for (idx, p) in not_null_ports.iter().enumerate() { - blocked_ports[idx] = *p; - } - - if blocked_ports.iter().all(|&port| port == 0) { - ipv6_firewall.remove(&addr.to_bits()).unwrap(); - } else { - ipv6_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } - } - } - BlockedPort::All => { - if to_insert { - ipv6_firewall - .insert(addr.to_bits(), [0; MAX_RULES_PORT], 0) - .unwrap(); - } else { - ipv6_firewall.remove(&addr.to_bits()).unwrap(); - } - } - } - } else if to_insert { - let mut blocked_ports: [u16; MAX_RULES_PORT] = [0; MAX_RULES_PORT]; - match port { - BlockedPort::Single(port) => { - blocked_ports[0] = port; - } - BlockedPort::All => {} - } - - ipv6_firewall - .insert(addr.to_bits(), blocked_ports, 0) - .unwrap(); - } -} - -pub fn load_ingress( - iface: String, - notification_sender: kanal::Sender, - data_sender: kanal::Sender<[u8; RawPacket::LEN]>, - filter_channel_receiver: kanal::Receiver, - firewall_ingress_receiver: kanal::Receiver, - terminate: Arc, -) { - thread::spawn({ - let iface = iface.to_owned(); - let notification_sender = notification_sender.clone(); - - move || { - let rlim = libc::rlimit { - rlim_cur: libc::RLIM_INFINITY, - rlim_max: libc::RLIM_INFINITY, - }; - - unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) }; - - #[cfg(debug_assertions)] - let mut bpf = match Ebpf::load(include_bytes_aligned!( - "../../target/bpfel-unknown-none/debug/oryx" - )) { - Ok(v) => v, - Err(e) => { - Notification::send( - format!("Failed to load the ingress eBPF bytecode\n {}", e), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - } - }; - - #[cfg(not(debug_assertions))] - let mut bpf = match Ebpf::load(include_bytes_aligned!( - "../../target/bpfel-unknown-none/release/oryx" - )) { - Ok(v) => v, - Err(e) => { - Notification::send( - format!("Failed to load the ingress eBPF bytecode\n {}", e), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - } - }; - - let _ = tc::qdisc_add_clsact(&iface); - - let program: &mut SchedClassifier = - bpf.program_mut("oryx").unwrap().try_into().unwrap(); - - if let Err(e) = program.load() { - Notification::send( - format!( - "Failed to load the ingress eBPF program to the kernel\n{}", - e - ), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - }; - - if let Err(e) = program.attach(&iface, TcAttachType::Ingress) { - Notification::send( - format!( - "Failed to attach the ingress eBPF program to the interface\n{}", - e - ), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - }; - - let mut poll = Poll::new().unwrap(); - let mut events = Events::with_capacity(128); - - //filter-ebpf interface - let mut transport_filters: Array<_, u32> = - Array::try_from(bpf.take_map("TRANSPORT_FILTERS").unwrap()).unwrap(); - - let mut network_filters: Array<_, u32> = - Array::try_from(bpf.take_map("NETWORK_FILTERS").unwrap()).unwrap(); - - let mut link_filters: Array<_, u32> = - Array::try_from(bpf.take_map("LINK_FILTERS").unwrap()).unwrap(); - - // firewall-ebpf interface - let mut ipv4_firewall: HashMap<_, u32, [u16; MAX_RULES_PORT]> = - HashMap::try_from(bpf.take_map("BLOCKLIST_IPV4").unwrap()).unwrap(); - - let mut ipv6_firewall: HashMap<_, u128, [u16; MAX_RULES_PORT]> = - HashMap::try_from(bpf.take_map("BLOCKLIST_IPV6").unwrap()).unwrap(); - - thread::spawn(move || loop { - if let Ok(signal) = firewall_ingress_receiver.recv() { - match signal { - FirewallSignal::Rule(rule) => match rule.ip { - IpAddr::V4(addr) => update_ipv4_blocklist( - &mut ipv4_firewall, - addr, - rule.port, - rule.enabled, - ), - - IpAddr::V6(addr) => update_ipv6_blocklist( - &mut ipv6_firewall, - addr, - rule.port, - rule.enabled, - ), - }, - FirewallSignal::Kill => { - break; - } - } - } - }); - - thread::spawn(move || loop { - if let Ok(signal) = filter_channel_receiver.recv() { - match signal { - FilterChannelSignal::Update((filter, flag)) => match filter { - Protocol::Transport(p) => { - let _ = transport_filters.set(p as u32, flag as u32, 0); - } - Protocol::Network(p) => { - let _ = network_filters.set(p as u32, flag as u32, 0); - } - Protocol::Link(p) => { - let _ = link_filters.set(p as u32, flag as u32, 0); - } - }, - FilterChannelSignal::Kill => { - break; - } - } - } - }); - - let mut ring_buf = RingBuffer::new(&mut bpf); - - poll.registry() - .register( - &mut SourceFd(&ring_buf.buffer.as_raw_fd()), - Token(0), - Interest::READABLE, - ) - .unwrap(); - - loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) - .unwrap(); - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - for event in &events { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - if event.token() == Token(0) && event.is_readable() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - while let Some(item) = ring_buf.next() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - let packet: [u8; RawPacket::LEN] = item.to_owned().try_into().unwrap(); - data_sender.send(packet).ok(); - } - } - } - } - - let _ = poll - .registry() - .deregister(&mut SourceFd(&ring_buf.buffer.as_raw_fd())); - } - }); -} - -pub fn load_egress( - iface: String, - notification_sender: kanal::Sender, - data_sender: kanal::Sender<[u8; RawPacket::LEN]>, - filter_channel_receiver: kanal::Receiver, - firewall_egress_receiver: kanal::Receiver, - terminate: Arc, -) { - thread::spawn({ - let iface = iface.to_owned(); - let notification_sender = notification_sender.clone(); - - move || { - let rlim = libc::rlimit { - rlim_cur: libc::RLIM_INFINITY, - rlim_max: libc::RLIM_INFINITY, - }; - - unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) }; - - #[cfg(debug_assertions)] - let mut bpf = match Ebpf::load(include_bytes_aligned!( - "../../target/bpfel-unknown-none/debug/oryx" - )) { - Ok(v) => v, - Err(e) => { - Notification::send( - format!("Fail to load the egress eBPF bytecode\n {}", e), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - } - }; - - #[cfg(not(debug_assertions))] - let mut bpf = match Ebpf::load(include_bytes_aligned!( - "../../target/bpfel-unknown-none/release/oryx" - )) { - Ok(v) => v, - Err(e) => { - Notification::send( - format!("Failed to load the egress eBPF bytecode\n {}", e), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - } - }; - - let _ = tc::qdisc_add_clsact(&iface); - let program: &mut SchedClassifier = - bpf.program_mut("oryx").unwrap().try_into().unwrap(); - - if let Err(e) = program.load() { - Notification::send( - format!("Fail to load the egress eBPF program to the kernel\n{}", e), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - }; - - if let Err(e) = program.attach(&iface, TcAttachType::Egress) { - Notification::send( - format!( - "Failed to attach the egress eBPF program to the interface\n{}", - e - ), - NotificationLevel::Error, - notification_sender, - ) - .unwrap(); - return; - }; - - let mut poll = Poll::new().unwrap(); - let mut events = Events::with_capacity(128); - - //filter-ebpf interface - let mut transport_filters: Array<_, u32> = - Array::try_from(bpf.take_map("TRANSPORT_FILTERS").unwrap()).unwrap(); - - let mut network_filters: Array<_, u32> = - Array::try_from(bpf.take_map("NETWORK_FILTERS").unwrap()).unwrap(); - - let mut link_filters: Array<_, u32> = - Array::try_from(bpf.take_map("LINK_FILTERS").unwrap()).unwrap(); - - // firewall-ebpf interface - let mut ipv4_firewall: HashMap<_, u32, [u16; MAX_RULES_PORT]> = - HashMap::try_from(bpf.take_map("BLOCKLIST_IPV4").unwrap()).unwrap(); - - let mut ipv6_firewall: HashMap<_, u128, [u16; MAX_RULES_PORT]> = - HashMap::try_from(bpf.take_map("BLOCKLIST_IPV6").unwrap()).unwrap(); - - thread::spawn(move || loop { - if let Ok(signal) = firewall_egress_receiver.recv() { - match signal { - FirewallSignal::Rule(rule) => match rule.ip { - IpAddr::V4(addr) => update_ipv4_blocklist( - &mut ipv4_firewall, - addr, - rule.port, - rule.enabled, - ), - - IpAddr::V6(addr) => update_ipv6_blocklist( - &mut ipv6_firewall, - addr, - rule.port, - rule.enabled, - ), - }, - FirewallSignal::Kill => { - break; - } - } - } - }); - - thread::spawn(move || loop { - if let Ok(signal) = filter_channel_receiver.recv() { - match signal { - FilterChannelSignal::Update((filter, flag)) => match filter { - Protocol::Transport(p) => { - let _ = transport_filters.set(p as u32, flag as u32, 0); - } - Protocol::Network(p) => { - let _ = network_filters.set(p as u32, flag as u32, 0); - } - Protocol::Link(p) => { - let _ = link_filters.set(p as u32, flag as u32, 0); - } - }, - FilterChannelSignal::Kill => { - break; - } - } - } - }); - - let mut ring_buf = RingBuffer::new(&mut bpf); - - poll.registry() - .register( - &mut SourceFd(&ring_buf.buffer.as_raw_fd()), - Token(0), - Interest::READABLE, - ) - .unwrap(); - - loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) - .unwrap(); - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - for event in &events { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - if event.token() == Token(0) && event.is_readable() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - while let Some(item) = ring_buf.next() { - if terminate.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - let packet: [u8; RawPacket::LEN] = item.to_owned().try_into().unwrap(); - data_sender.send(packet).ok(); - } - } - } - } - - let _ = poll - .registry() - .deregister(&mut SourceFd(&ring_buf.buffer.as_raw_fd())); - } - }); +enum EbpfTrafficDirection { + Ingress = -1, + Egress = 1, } diff --git a/oryx-tui/src/ebpf/egress.rs b/oryx-tui/src/ebpf/egress.rs new file mode 100644 index 0000000..68e3008 --- /dev/null +++ b/oryx-tui/src/ebpf/egress.rs @@ -0,0 +1,235 @@ +use std::{ + net::IpAddr, + os::fd::AsRawFd, + sync::{atomic::AtomicBool, Arc}, + thread, + time::Duration, +}; + +use aya::{ + include_bytes_aligned, + maps::{Array, HashMap}, + programs::{tc, SchedClassifier, TcAttachType}, + EbpfLoader, +}; +use log::error; +use oryx_common::{protocols::Protocol, RawPacket, MAX_RULES_PORT}; + +use crate::{ + event::Event, + filter::FilterChannelSignal, + notification::{Notification, NotificationLevel}, + section::firewall::FirewallSignal, +}; +use mio::{unix::SourceFd, Events, Interest, Poll, Token}; + +use super::{ + firewall::{update_ipv4_blocklist, update_ipv6_blocklist}, + EbpfTrafficDirection, RingBuffer, +}; + +pub fn load_egress( + iface: String, + notification_sender: kanal::Sender, + data_sender: kanal::Sender<[u8; RawPacket::LEN]>, + filter_channel_receiver: kanal::Receiver, + firewall_egress_receiver: kanal::Receiver, + terminate: Arc, +) { + thread::spawn({ + let iface = iface.to_owned(); + let notification_sender = notification_sender.clone(); + + move || { + let rlim = libc::rlimit { + rlim_cur: libc::RLIM_INFINITY, + rlim_max: libc::RLIM_INFINITY, + }; + + unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) }; + + let traffic_direction = EbpfTrafficDirection::Egress as i32; + + #[cfg(debug_assertions)] + let mut bpf = match EbpfLoader::new() + .set_global("TRAFFIC_DIRECTION", &traffic_direction, true) + .load(include_bytes_aligned!( + "../../../target/bpfel-unknown-none/debug/oryx" + )) { + Ok(v) => v, + Err(e) => { + error!("Fail to load the egress eBPF bytecode. {}", e); + Notification::send( + "Fail to load the egress eBPF bytecode", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + } + }; + + #[cfg(not(debug_assertions))] + let mut bpf = match EbpfLoader::new() + .set_global("TRAFFIC_DIRECTION", &traffic_direction, true) + .load(include_bytes_aligned!( + "../../../target/bpfel-unknown-none/debug/oryx" + )) { + Ok(v) => v, + Err(e) => { + error!("Fail to load the egress eBPF bytecode. {}", e); + Notification::send( + "Fail to load the egress eBPF bytecode", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + } + }; + + let _ = tc::qdisc_add_clsact(&iface); + let program: &mut SchedClassifier = + bpf.program_mut("oryx").unwrap().try_into().unwrap(); + + if let Err(e) = program.load() { + error!("Fail to load the egress eBPF program to the kernel. {}", e); + Notification::send( + "Fail to load the egress eBPF program to the kernel", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + }; + + if let Err(e) = program.attach(&iface, TcAttachType::Egress) { + error!( + "Failed to attach the egress eBPF program to the interface.{}", + e + ); + Notification::send( + "Failed to attach the egress eBPF program to the interface", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + }; + + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + + //filter-ebpf interface + let mut transport_filters: Array<_, u32> = + Array::try_from(bpf.take_map("TRANSPORT_FILTERS").unwrap()).unwrap(); + + let mut network_filters: Array<_, u32> = + Array::try_from(bpf.take_map("NETWORK_FILTERS").unwrap()).unwrap(); + + let mut link_filters: Array<_, u32> = + Array::try_from(bpf.take_map("LINK_FILTERS").unwrap()).unwrap(); + + let mut traffic_direction_filter: Array<_, u8> = + Array::try_from(bpf.take_map("TRAFFIC_DIRECTION_FILTER").unwrap()).unwrap(); + + // firewall-ebpf interface + let mut ipv4_firewall: HashMap<_, u32, [u16; MAX_RULES_PORT]> = + HashMap::try_from(bpf.take_map("BLOCKLIST_IPV4").unwrap()).unwrap(); + + let mut ipv6_firewall: HashMap<_, u128, [u16; MAX_RULES_PORT]> = + HashMap::try_from(bpf.take_map("BLOCKLIST_IPV6").unwrap()).unwrap(); + + // firewall thread + thread::spawn(move || loop { + if let Ok(signal) = firewall_egress_receiver.recv() { + match signal { + FirewallSignal::Rule(rule) => match rule.ip { + IpAddr::V4(addr) => update_ipv4_blocklist( + &mut ipv4_firewall, + addr, + rule.port, + rule.enabled, + ), + + IpAddr::V6(addr) => update_ipv6_blocklist( + &mut ipv6_firewall, + addr, + rule.port, + rule.enabled, + ), + }, + FirewallSignal::Kill => { + break; + } + } + } + }); + + // packets filters thread + thread::spawn(move || loop { + if let Ok(signal) = filter_channel_receiver.recv() { + match signal { + FilterChannelSignal::ProtoUpdate((filter, flag)) => match filter { + Protocol::Transport(p) => { + let _ = transport_filters.set(p as u32, flag as u32, 0); + } + Protocol::Network(p) => { + let _ = network_filters.set(p as u32, flag as u32, 0); + } + Protocol::Link(p) => { + let _ = link_filters.set(p as u32, flag as u32, 0); + } + }, + FilterChannelSignal::DirectionUpdate(flag) => { + let _ = traffic_direction_filter.set(0, flag as u8, 0); + } + FilterChannelSignal::Kill => { + break; + } + } + } + }); + + // packets reading + let mut ring_buf = RingBuffer::new(&mut bpf); + + poll.registry() + .register( + &mut SourceFd(&ring_buf.buffer.as_raw_fd()), + Token(0), + Interest::READABLE, + ) + .unwrap(); + + loop { + poll.poll(&mut events, Some(Duration::from_millis(100))) + .unwrap(); + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + for event in &events { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + if event.token() == Token(0) && event.is_readable() { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + while let Some(item) = ring_buf.next() { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + let packet: [u8; RawPacket::LEN] = item.to_owned().try_into().unwrap(); + data_sender.send(packet).ok(); + } + } + } + } + + let _ = poll + .registry() + .deregister(&mut SourceFd(&ring_buf.buffer.as_raw_fd())); + } + }); +} diff --git a/oryx-tui/src/ebpf/firewall.rs b/oryx-tui/src/ebpf/firewall.rs new file mode 100644 index 0000000..0156f09 --- /dev/null +++ b/oryx-tui/src/ebpf/firewall.rs @@ -0,0 +1,142 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; + +use aya::maps::{HashMap, MapData}; +use oryx_common::MAX_RULES_PORT; + +use crate::section::firewall::BlockedPort; + +pub fn update_ipv4_blocklist( + ipv4_firewall: &mut HashMap, + addr: Ipv4Addr, + port: BlockedPort, + to_insert: bool, +) { + if let Ok(mut blocked_ports) = ipv4_firewall.get(&addr.to_bits(), 0) { + match port { + BlockedPort::Single(port) => { + if to_insert { + if let Some((first_zero_index, _)) = blocked_ports + .iter() + .enumerate() + .find(|(_, &value)| value == 0) + { + blocked_ports[first_zero_index] = port; + ipv4_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } else { + unreachable!(); + } + } else { + let not_null_ports = blocked_ports + .into_iter() + .filter(|p| (*p != 0 && *p != port)) + .collect::>(); + + let mut blocked_ports = [0; MAX_RULES_PORT]; + + for (idx, p) in not_null_ports.iter().enumerate() { + blocked_ports[idx] = *p; + } + + if blocked_ports.iter().all(|&port| port == 0) { + ipv4_firewall.remove(&addr.to_bits()).unwrap(); + } else { + ipv4_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } + } + } + BlockedPort::All => { + if to_insert { + ipv4_firewall + .insert(addr.to_bits(), [0; MAX_RULES_PORT], 0) + .unwrap(); + } else { + ipv4_firewall.remove(&addr.to_bits()).unwrap(); + } + } + } + } else if to_insert { + let mut blocked_ports: [u16; MAX_RULES_PORT] = [0; MAX_RULES_PORT]; + match port { + BlockedPort::Single(port) => { + blocked_ports[0] = port; + } + BlockedPort::All => {} + } + + ipv4_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } +} + +pub fn update_ipv6_blocklist( + ipv6_firewall: &mut HashMap, + addr: Ipv6Addr, + port: BlockedPort, + to_insert: bool, +) { + if let Ok(mut blocked_ports) = ipv6_firewall.get(&addr.to_bits(), 0) { + match port { + BlockedPort::Single(port) => { + if to_insert { + if let Some((first_zero_index, _)) = blocked_ports + .iter() + .enumerate() + .find(|(_, &value)| value == 0) + { + blocked_ports[first_zero_index] = port; + ipv6_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } else { + unreachable!(); // list is full + } + } else { + let not_null_ports = blocked_ports + .into_iter() + .filter(|p| (*p != 0 && *p != port)) + .collect::>(); + + let mut blocked_ports = [0; MAX_RULES_PORT]; + + for (idx, p) in not_null_ports.iter().enumerate() { + blocked_ports[idx] = *p; + } + + if blocked_ports.iter().all(|&port| port == 0) { + ipv6_firewall.remove(&addr.to_bits()).unwrap(); + } else { + ipv6_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } + } + } + BlockedPort::All => { + if to_insert { + ipv6_firewall + .insert(addr.to_bits(), [0; MAX_RULES_PORT], 0) + .unwrap(); + } else { + ipv6_firewall.remove(&addr.to_bits()).unwrap(); + } + } + } + } else if to_insert { + let mut blocked_ports: [u16; MAX_RULES_PORT] = [0; MAX_RULES_PORT]; + match port { + BlockedPort::Single(port) => { + blocked_ports[0] = port; + } + BlockedPort::All => {} + } + + ipv6_firewall + .insert(addr.to_bits(), blocked_ports, 0) + .unwrap(); + } +} diff --git a/oryx-tui/src/ebpf/ingress.rs b/oryx-tui/src/ebpf/ingress.rs new file mode 100644 index 0000000..826e258 --- /dev/null +++ b/oryx-tui/src/ebpf/ingress.rs @@ -0,0 +1,239 @@ +use std::{ + net::IpAddr, + os::fd::AsRawFd, + sync::{atomic::AtomicBool, Arc}, + thread, + time::Duration, +}; + +use aya::{ + include_bytes_aligned, + maps::{Array, HashMap}, + programs::{tc, SchedClassifier, TcAttachType}, + EbpfLoader, +}; +use log::error; +use oryx_common::{protocols::Protocol, RawPacket, MAX_RULES_PORT}; + +use crate::{ + event::Event, + filter::FilterChannelSignal, + notification::{Notification, NotificationLevel}, + section::firewall::FirewallSignal, +}; +use mio::{unix::SourceFd, Events, Interest, Poll, Token}; + +use super::{ + firewall::{update_ipv4_blocklist, update_ipv6_blocklist}, + EbpfTrafficDirection, RingBuffer, +}; + +pub fn load_ingress( + iface: String, + notification_sender: kanal::Sender, + data_sender: kanal::Sender<[u8; RawPacket::LEN]>, + filter_channel_receiver: kanal::Receiver, + firewall_ingress_receiver: kanal::Receiver, + terminate: Arc, +) { + thread::spawn({ + let iface = iface.to_owned(); + let notification_sender = notification_sender.clone(); + + move || { + let rlim = libc::rlimit { + rlim_cur: libc::RLIM_INFINITY, + rlim_max: libc::RLIM_INFINITY, + }; + + unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) }; + + let traffic_direction = EbpfTrafficDirection::Ingress as i32; + + #[cfg(debug_assertions)] + let mut bpf = match EbpfLoader::new() + .set_global("TRAFFIC_DIRECTION", &traffic_direction, true) + .load(include_bytes_aligned!( + "../../../target/bpfel-unknown-none/debug/oryx" + )) { + Ok(v) => v, + Err(e) => { + error!("Failed to load the ingress eBPF bytecode. {}", e); + Notification::send( + "Failed to load the ingress eBPF bytecode", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + } + }; + + #[cfg(not(debug_assertions))] + let mut bpf = match EbpfLoader::new() + .set_global("TRAFFIC_DIRECTION", &traffic_direction, true) + .load(include_bytes_aligned!( + "../../../target/bpfel-unknown-none/debug/oryx" + )) { + Ok(v) => v, + Err(e) => { + error!("Failed to load the ingress eBPF bytecode. {}", e); + Notification::send( + "Failed to load the ingress eBPF bytecode", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + } + }; + + let _ = tc::qdisc_add_clsact(&iface); + + let program: &mut SchedClassifier = + bpf.program_mut("oryx").unwrap().try_into().unwrap(); + + if let Err(e) = program.load() { + error!( + "Failed to load the ingress eBPF program to the kernel. {}", + e + ); + Notification::send( + "Failed to load the ingress eBPF program to the kernel", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + }; + + if let Err(e) = program.attach(&iface, TcAttachType::Ingress) { + error!( + "Failed to attach the ingress eBPF program to the interface. {}", + e + ); + Notification::send( + "Failed to attach the ingress eBPF program to the interface", + NotificationLevel::Error, + notification_sender, + ) + .unwrap(); + return; + }; + + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(128); + + //filter-ebpf interface + let mut transport_filters: Array<_, u32> = + Array::try_from(bpf.take_map("TRANSPORT_FILTERS").unwrap()).unwrap(); + + let mut network_filters: Array<_, u32> = + Array::try_from(bpf.take_map("NETWORK_FILTERS").unwrap()).unwrap(); + + let mut link_filters: Array<_, u32> = + Array::try_from(bpf.take_map("LINK_FILTERS").unwrap()).unwrap(); + + let mut traffic_direction_filter: Array<_, u8> = + Array::try_from(bpf.take_map("TRAFFIC_DIRECTION_FILTER").unwrap()).unwrap(); + + // firewall-ebpf interface + let mut ipv4_firewall: HashMap<_, u32, [u16; MAX_RULES_PORT]> = + HashMap::try_from(bpf.take_map("BLOCKLIST_IPV4").unwrap()).unwrap(); + + let mut ipv6_firewall: HashMap<_, u128, [u16; MAX_RULES_PORT]> = + HashMap::try_from(bpf.take_map("BLOCKLIST_IPV6").unwrap()).unwrap(); + + // firewall thread + thread::spawn(move || loop { + if let Ok(signal) = firewall_ingress_receiver.recv() { + match signal { + FirewallSignal::Rule(rule) => match rule.ip { + IpAddr::V4(addr) => update_ipv4_blocklist( + &mut ipv4_firewall, + addr, + rule.port, + rule.enabled, + ), + + IpAddr::V6(addr) => update_ipv6_blocklist( + &mut ipv6_firewall, + addr, + rule.port, + rule.enabled, + ), + }, + FirewallSignal::Kill => { + break; + } + } + } + }); + + // packets filters thread + thread::spawn(move || loop { + if let Ok(signal) = filter_channel_receiver.recv() { + match signal { + FilterChannelSignal::ProtoUpdate((filter, flag)) => match filter { + Protocol::Transport(p) => { + let _ = transport_filters.set(p as u32, flag as u32, 0); + } + Protocol::Network(p) => { + let _ = network_filters.set(p as u32, flag as u32, 0); + } + Protocol::Link(p) => { + let _ = link_filters.set(p as u32, flag as u32, 0); + } + }, + FilterChannelSignal::DirectionUpdate(flag) => { + let _ = traffic_direction_filter.set(0, flag as u8, 0); + } + FilterChannelSignal::Kill => { + break; + } + } + } + }); + + // packets reader + let mut ring_buf = RingBuffer::new(&mut bpf); + + poll.registry() + .register( + &mut SourceFd(&ring_buf.buffer.as_raw_fd()), + Token(0), + Interest::READABLE, + ) + .unwrap(); + + loop { + poll.poll(&mut events, Some(Duration::from_millis(100))) + .unwrap(); + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + for event in &events { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + if event.token() == Token(0) && event.is_readable() { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + while let Some(item) = ring_buf.next() { + if terminate.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + let packet: [u8; RawPacket::LEN] = item.to_owned().try_into().unwrap(); + data_sender.send(packet).ok(); + } + } + } + } + + let _ = poll + .registry() + .deregister(&mut SourceFd(&ring_buf.buffer.as_raw_fd())); + } + }); +} diff --git a/oryx-tui/src/filter.rs b/oryx-tui/src/filter.rs index 7eb057e..a62a717 100644 --- a/oryx-tui/src/filter.rs +++ b/oryx-tui/src/filter.rs @@ -4,8 +4,6 @@ mod link; mod network; mod transport; -use std::{thread, time::Duration}; - use crossterm::event::{KeyCode, KeyEvent}; use direction::{TrafficDirection, TrafficDirectionFilter}; use link::LinkFilter; @@ -29,7 +27,7 @@ use tui_big_text::{BigText, PixelSize}; use crate::{ app::AppResult, - ebpf::{load_egress, load_ingress}, + ebpf::{egress::load_egress, ingress::load_ingress}, event::Event, interface::Interface, section::firewall::FirewallSignal, @@ -37,30 +35,31 @@ use crate::{ #[derive(Debug, Clone)] pub enum FilterChannelSignal { - Update((Protocol, bool)), + ProtoUpdate((Protocol, bool)), + DirectionUpdate(bool), Kill, } #[derive(Debug, Clone)] -pub struct Channels { - pub sender: kanal::Sender, - pub receiver: kanal::Receiver, +pub struct Channels { + pub sender: kanal::Sender, + pub receiver: kanal::Receiver, } #[derive(Debug, Clone)] -pub struct IoChans { - pub ingress: Channels, - pub egress: Channels, +pub struct IoChannels { + pub ingress: Channels, + pub egress: Channels, } -impl Channels { +impl Channels { pub fn new() -> Self { let (sender, receiver) = kanal::unbounded(); Self { sender, receiver } } } -impl IoChans { +impl IoChannels { pub fn new() -> Self { Self { ingress: Channels::new(), @@ -69,13 +68,13 @@ impl IoChans { } } -impl Default for Channels { +impl Default for Channels { fn default() -> Self { Self::new() } } -impl Default for IoChans { +impl Default for IoChannels { fn default() -> Self { Self::new() } @@ -98,41 +97,49 @@ pub struct Filter { pub transport: TransportFilter, pub link: LinkFilter, pub traffic_direction: TrafficDirectionFilter, - pub filter_chans: IoChans, - pub firewall_chans: IoChans, + pub filter_chans: IoChannels, + pub firewall_chans: IoChannels, pub focused_block: FocusedBlock, - pub firewall_ingress_sender: kanal::Sender, - pub firewall_ingress_receiver: kanal::Receiver, - pub firewall_egress_sender: kanal::Sender, - pub firewall_egress_receiver: kanal::Receiver, } impl Filter { - pub fn new( - firewall_ingress_sender: kanal::Sender, - firewall_ingress_receiver: kanal::Receiver, - firewall_egress_sender: kanal::Sender, - firewall_egress_receiver: kanal::Receiver, - ) -> Self { + pub fn new(firewall_chans: IoChannels) -> Self { Self { interface: Interface::new(), network: NetworkFilter::new(), transport: TransportFilter::new(), link: LinkFilter::new(), traffic_direction: TrafficDirectionFilter::new(), - filter_chans: IoChans::new(), - firewall_chans: IoChans::new(), + filter_chans: IoChannels::new(), + firewall_chans, focused_block: FocusedBlock::Interface, - firewall_ingress_sender, - firewall_ingress_receiver, - firewall_egress_sender, - firewall_egress_receiver, } } pub fn terminate(&mut self) { + // terminate packets reader threads self.traffic_direction.terminate(TrafficDirection::Egress); self.traffic_direction.terminate(TrafficDirection::Ingress); + + // terminate filter /packets sender threads + let _ = self + .filter_chans + .ingress + .sender + .send(FilterChannelSignal::Kill); + let _ = self + .filter_chans + .egress + .sender + .send(FilterChannelSignal::Kill); + + // terminate firewall threads + let _ = self + .firewall_chans + .ingress + .sender + .send(FirewallSignal::Kill); + let _ = self.firewall_chans.egress.sender.send(FirewallSignal::Kill); } pub fn start( @@ -144,35 +151,23 @@ impl Filter { self.apply(); - if self - .traffic_direction - .applied_direction - .contains(&TrafficDirection::Ingress) - { - load_ingress( - iface.clone(), - notification_sender.clone(), - data_sender.clone(), - self.filter_chans.ingress.receiver.clone(), - self.firewall_ingress_receiver.clone(), - self.traffic_direction.terminate_ingress.clone(), - ); - } + load_ingress( + iface.clone(), + notification_sender.clone(), + data_sender.clone(), + self.filter_chans.ingress.receiver.clone(), + self.firewall_chans.ingress.receiver.clone(), + self.traffic_direction.terminate_ingress.clone(), + ); - if self - .traffic_direction - .applied_direction - .contains(&TrafficDirection::Egress) - { - load_egress( - iface, - notification_sender, - data_sender, - self.filter_chans.egress.receiver.clone(), - self.firewall_egress_receiver.clone(), - self.traffic_direction.terminate_egress.clone(), - ); - } + load_egress( + iface, + notification_sender, + data_sender, + self.filter_chans.egress.receiver.clone(), + self.firewall_chans.egress.receiver.clone(), + self.traffic_direction.terminate_egress.clone(), + ); self.sync()?; @@ -200,14 +195,14 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Transport(*protocol), false, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Transport(*protocol), false, )))?; @@ -215,14 +210,14 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Transport(*protocol), true, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Transport(*protocol), true, )))?; @@ -234,14 +229,14 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Network(*protocol), false, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Network(*protocol), false, )))?; @@ -249,14 +244,14 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Network(*protocol), true, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Network(*protocol), true, )))?; @@ -268,14 +263,14 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Link(*protocol), false, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Link(*protocol), false, )))?; @@ -283,127 +278,52 @@ impl Filter { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Link(*protocol), true, )))?; self.filter_chans .egress .sender - .send(FilterChannelSignal::Update(( + .send(FilterChannelSignal::ProtoUpdate(( Protocol::Link(*protocol), true, )))?; } } - Ok(()) - } - - pub fn update( - &mut self, - notification_sender: kanal::Sender, - data_sender: kanal::Sender<[u8; RawPacket::LEN]>, - ) -> AppResult<()> { - // Remove egress if self .traffic_direction .applied_direction - .contains(&TrafficDirection::Egress) - && !self - .traffic_direction - .selected_direction - .contains(&TrafficDirection::Egress) + .contains(&TrafficDirection::Ingress) { - self.firewall_egress_sender.send(FirewallSignal::Kill)?; self.filter_chans - .egress + .ingress .sender - .send(FilterChannelSignal::Kill)?; - self.traffic_direction.terminate(TrafficDirection::Egress); - } - - // Add egress - if !self - .traffic_direction - .applied_direction - .contains(&TrafficDirection::Egress) - && self - .traffic_direction - .selected_direction - .contains(&TrafficDirection::Egress) - { - self.traffic_direction - .terminate_egress - .store(false, std::sync::atomic::Ordering::Relaxed); - - let iface = self.interface.selected_interface.name.clone(); - - load_egress( - iface, - notification_sender.clone(), - data_sender.clone(), - self.filter_chans.egress.receiver.clone(), - self.firewall_egress_receiver.clone(), - self.traffic_direction.terminate_egress.clone(), - ); - } - - // Remove ingress - if self - .traffic_direction - .applied_direction - .contains(&TrafficDirection::Ingress) - && !self - .traffic_direction - .selected_direction - .contains(&TrafficDirection::Ingress) - { - self.firewall_ingress_sender.send(FirewallSignal::Kill)?; + .send(FilterChannelSignal::DirectionUpdate(false))?; + } else { self.filter_chans .ingress .sender - .send(FilterChannelSignal::Kill)?; - self.traffic_direction.terminate(TrafficDirection::Ingress); + .send(FilterChannelSignal::DirectionUpdate(true))?; } - // Add ingress - if !self + if self .traffic_direction .applied_direction - .contains(&TrafficDirection::Ingress) - && self - .traffic_direction - .selected_direction - .contains(&TrafficDirection::Ingress) + .contains(&TrafficDirection::Egress) { - let iface = self.interface.selected_interface.name.clone(); - self.traffic_direction - .terminate_ingress - .store(false, std::sync::atomic::Ordering::Relaxed); - load_ingress( - iface, - notification_sender.clone(), - data_sender.clone(), - self.filter_chans.ingress.receiver.clone(), - self.firewall_ingress_receiver.clone(), - self.traffic_direction.terminate_ingress.clone(), - ); + self.filter_chans + .egress + .sender + .send(FilterChannelSignal::DirectionUpdate(false))?; + } else { + self.filter_chans + .egress + .sender + .send(FilterChannelSignal::DirectionUpdate(true))?; } - self.apply(); - - thread::sleep(Duration::from_millis(150)); - - self.traffic_direction - .terminate_ingress - .store(false, std::sync::atomic::Ordering::Relaxed); - self.traffic_direction - .terminate_ingress - .store(false, std::sync::atomic::Ordering::Relaxed); - - self.sync()?; - Ok(()) } diff --git a/oryx-tui/src/filter/direction.rs b/oryx-tui/src/filter/direction.rs index 2e5cb2d..bda78e7 100644 --- a/oryx-tui/src/filter/direction.rs +++ b/oryx-tui/src/filter/direction.rs @@ -83,14 +83,6 @@ impl TrafficDirectionFilter { self.selected_direction.clear(); } - pub fn is_ingress_loaded(&self) -> bool { - self.applied_direction.contains(&TrafficDirection::Ingress) - } - - pub fn is_egress_loaded(&self) -> bool { - self.applied_direction.contains(&TrafficDirection::Egress) - } - pub fn render(&mut self, frame: &mut Frame, block: Rect, is_focused: bool) { let layout = Layout::default() .direction(Direction::Horizontal) diff --git a/oryx-tui/src/handler.rs b/oryx-tui/src/handler.rs index bc92abe..29cd0bb 100644 --- a/oryx-tui/src/handler.rs +++ b/oryx-tui/src/handler.rs @@ -71,16 +71,8 @@ pub fn handle_key_events( KeyCode::Enter => match popup { ActivePopup::UpdateFilters => { if app.filter.focused_block == FocusedBlock::Apply { - app.filter - .update(event_sender.clone(), app.data_channel_sender.clone())?; - if !app.filter.traffic_direction.is_ingress_loaded() { - app.section.firewall.disable_ingress_rules(); - } - - if !app.filter.traffic_direction.is_egress_loaded() { - app.section.firewall.disable_egress_rules(); - } - + app.filter.apply(); + app.filter.sync()?; app.active_popup = None; } } @@ -143,15 +135,11 @@ pub fn handle_key_events( } KeyCode::Char('q') => { - app.filter.terminate(); - thread::sleep(Duration::from_millis(110)); app.quit(); } KeyCode::Char('c') | KeyCode::Char('C') => { if key_event.modifiers == KeyModifiers::CONTROL { - app.filter.terminate(); - thread::sleep(Duration::from_millis(110)); app.quit(); } } @@ -178,16 +166,6 @@ pub fn handle_key_events( } } - KeyCode::Char(' ') => { - if app.section.focused_section == FocusedSection::Firewall { - app.section.firewall.load_rule( - event_sender.clone(), - app.filter.traffic_direction.is_ingress_loaded(), - app.filter.traffic_direction.is_egress_loaded(), - )?; - } - } - _ => { app.section.handle_keys(key_event, event_sender.clone())?; } diff --git a/oryx-tui/src/section.rs b/oryx-tui/src/section.rs index 1c080df..6c64432 100644 --- a/oryx-tui/src/section.rs +++ b/oryx-tui/src/section.rs @@ -22,6 +22,7 @@ use stats::Stats; use crate::{ app::{ActivePopup, AppResult}, event::Event, + filter::IoChannels, packet::AppPacket, }; @@ -45,15 +46,14 @@ pub struct Section { impl Section { pub fn new( packets: Arc>>, - firewall_ingress_sender: kanal::Sender, - firewall_egress_sender: kanal::Sender, + firewall_chans: IoChannels, ) -> Self { Self { focused_section: FocusedSection::Inspection, inspection: Inspection::new(packets.clone()), stats: Stats::new(packets.clone()), alert: Alert::new(packets.clone()), - firewall: Firewall::new(firewall_ingress_sender, firewall_egress_sender), + firewall: Firewall::new(firewall_chans.ingress.sender, firewall_chans.egress.sender), } } fn title_span(&self, header_section: FocusedSection) -> Span { diff --git a/oryx-tui/src/section/firewall.rs b/oryx-tui/src/section/firewall.rs index 0b5744b..df44284 100644 --- a/oryx-tui/src/section/firewall.rs +++ b/oryx-tui/src/section/firewall.rs @@ -59,7 +59,6 @@ impl FromStr for BlockedPort { } } -// TODO: Add direction impl Display for FirewallRule { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} {}", self.ip, self.port) @@ -326,7 +325,7 @@ impl Firewall { self.user_input = Some(UserInput::new()); } - pub fn save_rules(&self) -> AppResult<()> { + pub fn save_rules(&mut self) -> AppResult<()> { info!("Saving Firewall Rules"); let json = serde_json::to_string(&self.rules)?; @@ -356,7 +355,6 @@ impl Firewall { let mut parsed_rules: Vec = serde_json::from_str(&json_string)?; - // as we don't know if ingress/egress programs are loaded we have to disable all rules parsed_rules .iter_mut() .for_each(|rule| rule.enabled = false); @@ -419,47 +417,6 @@ impl Firewall { }); } - pub fn load_rule( - &mut self, - sender: kanal::Sender, - is_ingress_loaded: bool, - is_egress_loaded: bool, - ) -> AppResult<()> { - if let Some(index) = self.state.selected() { - let rule = &mut self.rules[index]; - - match rule.direction { - TrafficDirection::Ingress => { - if is_ingress_loaded { - rule.enabled = !rule.enabled; - self.ingress_sender - .send(FirewallSignal::Rule(rule.clone()))?; - } else { - Notification::send( - "Ingress is not loaded.", - crate::notification::NotificationLevel::Warning, - sender.clone(), - )?; - } - } - TrafficDirection::Egress => { - if is_egress_loaded { - rule.enabled = !rule.enabled; - self.egress_sender - .send(FirewallSignal::Rule(rule.clone()))?; - } else { - Notification::send( - "Egress is not loaded.", - crate::notification::NotificationLevel::Warning, - sender.clone(), - )?; - } - } - } - } - Ok(()) - } - pub fn handle_keys( &mut self, key_event: KeyEvent, @@ -554,6 +511,23 @@ impl Firewall { self.add_rule(); } + KeyCode::Char(' ') => { + if let Some(index) = self.state.selected() { + let rule = &mut self.rules[index]; + rule.enabled = !rule.enabled; + match rule.direction { + TrafficDirection::Ingress => { + self.ingress_sender + .send(FirewallSignal::Rule(rule.clone()))?; + } + TrafficDirection::Egress => { + self.egress_sender + .send(FirewallSignal::Rule(rule.clone()))?; + } + } + } + } + KeyCode::Char('s') => match self.save_rules() { Ok(_) => { Notification::send( diff --git a/oryx-tui/src/section/inspection.rs b/oryx-tui/src/section/inspection.rs index 8e02533..9bbca58 100644 --- a/oryx-tui/src/section/inspection.rs +++ b/oryx-tui/src/section/inspection.rs @@ -206,7 +206,7 @@ impl Inspection { } else if i == self.packet_window_size - 1 && app_packets.len() > self.packet_end_index { - // shit the window by one + // shift the window by one self.packet_end_index += 1; i + 1 } else {