Skip to content

Commit 2b2d3ff

Browse files
committed
basic sandwich
1 parent 8a0046f commit 2b2d3ff

File tree

7 files changed

+442
-295
lines changed

7 files changed

+442
-295
lines changed

Makefile.core.mk

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ endif
1111
test:
1212
RUST_BACKTRACE=1 cargo test --benches --tests --bins $(FEATURES)
1313

14+
test.root:
15+
CARGO_TARGET=`rustc -vV | sed -n 's|host: ||p' | tr [:lower:] [:upper:]| tr - _`_RUNNER='sudo -E' RUST_BACKTRACE=1 cargo test --benches --tests --bins $(FEATURES)
16+
1417
build:
1518
cargo build $(FEATURES)
1619

src/proxy.rs

Lines changed: 255 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ use tracing::{error, trace, warn, Instrument};
2929
use inbound::Inbound;
3030
pub use metrics::*;
3131

32-
use crate::identity::SecretManager;
32+
use crate::identity::{Identity, SecretManager};
3333
use crate::metrics::Recorder;
3434
use crate::proxy::connection_manager::{ConnectionManager, PolicyWatcher};
3535
use crate::proxy::inbound_passthrough::InboundPassthrough;
3636
use crate::proxy::outbound::Outbound;
3737
use crate::proxy::socks5::Socks5;
3838
use crate::rbac::Connection;
3939
use crate::state::service::{endpoint_uid, Service, ServiceDescription};
40-
use crate::state::workload::{network_addr, Workload};
40+
use crate::state::workload::address::Address;
41+
use crate::state::workload::{network_addr, GatewayAddress, Workload};
4142
use crate::state::{DemandProxyState, WorkloadInfo};
4243
use crate::{config, identity, socket, tls};
4344

@@ -262,6 +263,9 @@ pub enum Error {
262263

263264
#[error("unsupported feature: {0}")]
264265
UnsupportedFeature(String),
266+
267+
#[error("ip mismatch: {0} != {1}")]
268+
IPMismatch(IpAddr, IpAddr),
265269
}
266270

267271
// TLS record size max is 16k. But we also have a H2 frame header, so leave a bit of room for that.
@@ -526,6 +530,60 @@ pub fn guess_inbound_service(
526530
.map(ServiceDescription::from)
527531
}
528532

533+
// Checks if the connection's source identity is the identity for the upstream's waypoint
534+
async fn check_from_waypoint(
535+
state: DemandProxyState,
536+
upstream: &Workload,
537+
src_identity: Option<&Identity>,
538+
) -> bool {
539+
check_gateway_address(state, src_identity, upstream.waypoint.as_ref()).await
540+
}
541+
542+
// Checks if the connection's source identity is the identity for the upstream's network
543+
// gateway
544+
async fn check_from_network_gateway(
545+
state: DemandProxyState,
546+
upstream: &Workload,
547+
src_identity: Option<&Identity>,
548+
) -> bool {
549+
check_gateway_address(state, src_identity, upstream.network_gateway.as_ref()).await
550+
}
551+
552+
// Check if the source's identity matches any workloads that make up the given gateway
553+
// TODO: This can be made more accurate by also checking addresses.
554+
async fn check_gateway_address(
555+
state: DemandProxyState,
556+
src_identity: Option<&Identity>,
557+
gateway_address: Option<&GatewayAddress>,
558+
) -> bool {
559+
let Some(src_identity) = src_identity else {
560+
return false;
561+
};
562+
if let Some(gateway_address) = gateway_address {
563+
let from_gateway = match state.fetch_destination(&gateway_address.destination).await {
564+
Some(Address::Workload(wl)) => &wl.identity() == src_identity,
565+
Some(Address::Service(svc)) => {
566+
for (_ep_uid, ep) in svc.endpoints.iter() {
567+
// fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname)
568+
if state
569+
.fetch_workload_by_uid(&ep.workload_uid)
570+
.await
571+
.map(|w| w.identity())
572+
.as_ref()
573+
== Some(src_identity)
574+
{
575+
return true;
576+
}
577+
}
578+
false
579+
}
580+
None => false,
581+
};
582+
return from_gateway;
583+
}
584+
false // this occurs if gateway_address was None
585+
}
586+
529587
#[cfg(test)]
530588
mod tests {
531589
use std::assert_eq;
@@ -558,4 +616,199 @@ mod tests {
558616
let expect = expect.map(|i| i.parse::<IpAddr>().unwrap());
559617
assert_eq!(get_original_src_from_fwded(&headers), expect)
560618
}
619+
620+
use hickory_resolver::config::{ResolverConfig, ResolverOpts};
621+
622+
use crate::state::service::endpoint_uid;
623+
use crate::state::workload::{NamespacedHostname, NetworkAddress};
624+
use crate::{
625+
identity::Identity,
626+
state::{
627+
self,
628+
service::{Endpoint, Service},
629+
workload::gatewayaddress::Destination,
630+
},
631+
};
632+
use std::{
633+
collections::HashMap,
634+
net::{Ipv4Addr, SocketAddrV4},
635+
sync::RwLock,
636+
};
637+
638+
#[tokio::test]
639+
async fn check_gateway() {
640+
let w = mock_default_gateway_workload();
641+
let s = mock_default_gateway_service();
642+
let mut state = state::ProxyState::default();
643+
if let Err(err) = state.workloads.insert(w) {
644+
panic!("received error inserting workload: {}", err);
645+
}
646+
state.services.insert(s);
647+
let state = state::DemandProxyState::new(
648+
Arc::new(RwLock::new(state)),
649+
None,
650+
ResolverConfig::default(),
651+
ResolverOpts::default(),
652+
);
653+
654+
let gateawy_id = Identity::Spiffe {
655+
trust_domain: "cluster.local".to_string(),
656+
namespace: "gatewayns".to_string(),
657+
service_account: "default".to_string(),
658+
};
659+
let from_gw_conn = Some(gateawy_id);
660+
let not_from_gw_conn = Some(Identity::default());
661+
662+
let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address()));
663+
assert!(
664+
check_from_network_gateway(
665+
state.clone(),
666+
&upstream_with_address,
667+
from_gw_conn.as_ref(),
668+
)
669+
.await
670+
);
671+
assert!(
672+
!check_from_network_gateway(
673+
state.clone(),
674+
&upstream_with_address,
675+
not_from_gw_conn.as_ref(),
676+
)
677+
.await
678+
);
679+
680+
// using hostname (will check the service variant of address::Address)
681+
let upstream_with_hostname =
682+
mock_wokload_with_gateway(Some(mock_default_gateway_hostname()));
683+
assert!(
684+
check_from_network_gateway(
685+
state.clone(),
686+
&upstream_with_hostname,
687+
from_gw_conn.as_ref(),
688+
)
689+
.await
690+
);
691+
assert!(
692+
!check_from_network_gateway(state, &upstream_with_hostname, not_from_gw_conn.as_ref())
693+
.await
694+
);
695+
}
696+
697+
// private helpers
698+
fn mock_wokload_with_gateway(gw: Option<GatewayAddress>) -> Workload {
699+
Workload {
700+
workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)],
701+
waypoint: None,
702+
network_gateway: gw,
703+
gateway_address: None,
704+
protocol: Default::default(),
705+
uid: "".to_string(),
706+
name: "app".to_string(),
707+
namespace: "appns".to_string(),
708+
trust_domain: "cluster.local".to_string(),
709+
service_account: "default".to_string(),
710+
network: "".to_string(),
711+
workload_name: "app".to_string(),
712+
workload_type: "deployment".to_string(),
713+
canonical_name: "app".to_string(),
714+
canonical_revision: "".to_string(),
715+
hostname: "".to_string(),
716+
node: "".to_string(),
717+
status: Default::default(),
718+
cluster_id: "Kubernetes".to_string(),
719+
720+
authorization_policies: Vec::new(),
721+
native_tunnel: false,
722+
}
723+
}
724+
725+
fn mock_default_gateway_workload() -> Workload {
726+
Workload {
727+
workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())],
728+
waypoint: None,
729+
network_gateway: None,
730+
gateway_address: None,
731+
protocol: Default::default(),
732+
uid: "".to_string(),
733+
name: "gateway".to_string(),
734+
namespace: "gatewayns".to_string(),
735+
trust_domain: "cluster.local".to_string(),
736+
service_account: "default".to_string(),
737+
network: "".to_string(),
738+
workload_name: "gateway".to_string(),
739+
workload_type: "deployment".to_string(),
740+
canonical_name: "".to_string(),
741+
canonical_revision: "".to_string(),
742+
hostname: "".to_string(),
743+
node: "".to_string(),
744+
status: Default::default(),
745+
cluster_id: "Kubernetes".to_string(),
746+
747+
authorization_policies: Vec::new(),
748+
native_tunnel: false,
749+
}
750+
}
751+
752+
fn mock_default_gateway_service() -> Service {
753+
let vip1 = NetworkAddress {
754+
address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)),
755+
network: "".to_string(),
756+
};
757+
let vips = vec![vip1];
758+
let mut ports = HashMap::new();
759+
ports.insert(8080, 80);
760+
let mut endpoints = HashMap::new();
761+
let addr = Some(NetworkAddress {
762+
network: "".to_string(),
763+
address: IpAddr::V4(mock_default_gateway_ipaddr()),
764+
});
765+
endpoints.insert(
766+
endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()),
767+
Endpoint {
768+
workload_uid: mock_default_gateway_workload().uid,
769+
service: NamespacedHostname {
770+
namespace: "gatewayns".to_string(),
771+
hostname: "gateway".to_string(),
772+
},
773+
address: addr,
774+
port: ports.clone(),
775+
},
776+
);
777+
Service {
778+
name: "gateway".to_string(),
779+
namespace: "gatewayns".to_string(),
780+
hostname: "gateway".to_string(),
781+
vips,
782+
ports,
783+
endpoints,
784+
subject_alt_names: vec![],
785+
waypoint: None,
786+
}
787+
}
788+
789+
fn mock_default_gateway_address() -> GatewayAddress {
790+
GatewayAddress {
791+
destination: Destination::Address(NetworkAddress {
792+
network: "".to_string(),
793+
address: IpAddr::V4(mock_default_gateway_ipaddr()),
794+
}),
795+
hbone_mtls_port: 15008,
796+
hbone_single_tls_port: Some(15003),
797+
}
798+
}
799+
800+
fn mock_default_gateway_hostname() -> GatewayAddress {
801+
GatewayAddress {
802+
destination: Destination::Hostname(state::workload::NamespacedHostname {
803+
namespace: "gatewayns".to_string(),
804+
hostname: "gateway".to_string(),
805+
}),
806+
hbone_mtls_port: 15008,
807+
hbone_single_tls_port: Some(15003),
808+
}
809+
}
810+
811+
fn mock_default_gateway_ipaddr() -> Ipv4Addr {
812+
Ipv4Addr::new(127, 0, 0, 100)
813+
}
561814
}

0 commit comments

Comments
 (0)