diff --git a/CHANGELOG.md b/CHANGELOG.md index 9011d876..df9415cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Listener.status.addresses for NodePort listeners now includes replicas that are currently unavailable ([#231]). + +### Fixed + +- Listener.status.addresses is now de-duplicated ([#231]). +- Listener controller now listens for ListenerClass updates ([#231]). + +[#231]: https://github.com/stackabletech/listener-operator/pull/231 + ## [24.7.0] - 2024-07-24 ### Added diff --git a/Cargo.lock b/Cargo.lock index bdc9aa29..f5fd108d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,15 +698,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.31" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" - -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -719,9 +713,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -729,15 +723,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -746,15 +740,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -763,23 +757,22 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures 0.1.31", "futures-channel", "futures-core", "futures-io", @@ -1256,7 +1249,7 @@ dependencies = [ "bytes", "chrono", "either", - "futures 0.3.30", + "futures", "home", "http", "http-body", @@ -1327,7 +1320,7 @@ dependencies = [ "async-trait", "backoff", "derivative", - "futures 0.3.30", + "futures", "hashbrown 0.14.5", "json-patch", "jsonptr", @@ -2312,7 +2305,7 @@ dependencies = [ "built", "clap", "csi-grpc", - "futures 0.3.30", + "futures", "h2", "libc", "pin-project", @@ -2326,6 +2319,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-reflection", + "tracing", ] [[package]] @@ -2340,7 +2334,7 @@ dependencies = [ "derivative", "dockerfile-parser", "either", - "futures 0.3.30", + "futures", "json-patch", "k8s-openapi", "kube", diff --git a/Cargo.nix b/Cargo.nix index 48ca33dd..17344144 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -2022,24 +2022,11 @@ rec { }; resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; - "futures 0.1.31" = rec { + "futures" = rec { crateName = "futures"; - version = "0.1.31"; - edition = "2015"; - sha256 = "0y46qbmhi37dqkch8dlfq5aninqpzqgrr98awkb3rn4fxww1lirs"; - authors = [ - "Alex Crichton " - ]; - features = { - "default" = [ "use_std" "with-deprecated" ]; - }; - resolvedDefaultFeatures = [ "default" "use_std" "with-deprecated" ]; - }; - "futures 0.3.30" = rec { - crateName = "futures"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1c04g14bccmprwsvx2j9m2blhwrynq7vhl151lsvcv4gi0b6jp34"; + sha256 = "0xh8ddbkm9jy8kc5gbvjp9a4b6rqqxvc8471yb2qaz5wm2qhgg35"; dependencies = [ { name = "futures-channel"; @@ -2094,13 +2081,13 @@ rec { "unstable" = [ "futures-core/unstable" "futures-task/unstable" "futures-channel/unstable" "futures-io/unstable" "futures-util/unstable" ]; "write-all-vectored" = [ "futures-util/write-all-vectored" ]; }; - resolvedDefaultFeatures = [ "alloc" "async-await" "compat" "default" "executor" "futures-executor" "std" ]; + resolvedDefaultFeatures = [ "alloc" "async-await" "default" "executor" "futures-executor" "std" ]; }; "futures-channel" = rec { crateName = "futures-channel"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "0y6b7xxqdjm9hlcjpakcg41qfl7lihf6gavk8fyqijsxhvbzgj7a"; + sha256 = "040vpqpqlbk099razq8lyn74m0f161zd0rp36hciqrwcg2zibzrd"; libName = "futures_channel"; dependencies = [ { @@ -2126,9 +2113,9 @@ rec { }; "futures-core" = rec { crateName = "futures-core"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "07aslayrn3lbggj54kci0ishmd1pr367fp7iks7adia1p05miinz"; + sha256 = "0gk6yrxgi5ihfanm2y431jadrll00n5ifhnpx090c2f2q1cr1wh5"; libName = "futures_core"; features = { "default" = [ "std" ]; @@ -2139,9 +2126,9 @@ rec { }; "futures-executor" = rec { crateName = "futures-executor"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "07dh08gs9vfll2h36kq32q9xd86xm6lyl9xikmmwlkqnmrrgqxm5"; + sha256 = "17vcci6mdfzx4gbk0wx64chr2f13wwwpvyf3xd5fb1gmjzcx2a0y"; libName = "futures_executor"; dependencies = [ { @@ -2170,9 +2157,9 @@ rec { }; "futures-io" = rec { crateName = "futures-io"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1hgh25isvsr4ybibywhr4dpys8mjnscw4wfxxwca70cn1gi26im4"; + sha256 = "1ikmw1yfbgvsychmsihdkwa8a1knank2d9a8dk01mbjar9w1np4y"; libName = "futures_io"; features = { "default" = [ "std" ]; @@ -2181,9 +2168,9 @@ rec { }; "futures-macro" = rec { crateName = "futures-macro"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1b49qh9d402y8nka4q6wvvj0c88qq91wbr192mdn5h54nzs0qxc7"; + sha256 = "0l1n7kqzwwmgiznn0ywdc5i24z72zvh9q1dwps54mimppi7f6bhn"; procMacro = true; libName = "futures_macro"; dependencies = [ @@ -2205,9 +2192,9 @@ rec { }; "futures-sink" = rec { crateName = "futures-sink"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1dag8xyyaya8n8mh8smx7x6w2dpmafg2din145v973a3hw7f1f4z"; + sha256 = "1xyly6naq6aqm52d5rh236snm08kw8zadydwqz8bip70s6vzlxg5"; libName = "futures_sink"; features = { "default" = [ "std" ]; @@ -2217,9 +2204,9 @@ rec { }; "futures-task" = rec { crateName = "futures-task"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "013h1724454hj8qczp8vvs10qfiqrxr937qsrv6rhii68ahlzn1q"; + sha256 = "124rv4n90f5xwfsm9qw6y99755y021cmi5dhzh253s920z77s3zr"; libName = "futures_task"; features = { "default" = [ "std" ]; @@ -2229,17 +2216,11 @@ rec { }; "futures-util" = rec { crateName = "futures-util"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "0j0xqhcir1zf2dcbpd421kgw6wvsk0rpxflylcysn1rlp3g02r1x"; + sha256 = "10aa1ar8bgkgbr4wzxlidkqkcxf77gffyj8j7768h831pcaq784z"; libName = "futures_util"; dependencies = [ - { - name = "futures"; - packageId = "futures 0.1.31"; - rename = "futures_01"; - optional = true; - } { name = "futures-channel"; packageId = "futures-channel"; @@ -2317,7 +2298,7 @@ rec { "unstable" = [ "futures-core/unstable" "futures-task/unstable" ]; "write-all-vectored" = [ "io" ]; }; - resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "channel" "compat" "futures-channel" "futures-io" "futures-macro" "futures-sink" "futures_01" "io" "memchr" "sink" "slab" "std" ]; + resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "channel" "futures-channel" "futures-io" "futures-macro" "futures-sink" "io" "memchr" "sink" "slab" "std" ]; }; "generic-array" = rec { crateName = "generic-array"; @@ -3820,7 +3801,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; optional = true; usesDefaultFeatures = false; features = [ "std" ]; @@ -3961,7 +3942,7 @@ rec { devDependencies = [ { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; usesDefaultFeatures = false; features = [ "async-await" ]; } @@ -4179,7 +4160,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; usesDefaultFeatures = false; features = [ "async-await" ]; } @@ -7305,8 +7286,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; - features = [ "compat" ]; + packageId = "futures"; } { name = "h2"; @@ -7364,6 +7344,10 @@ rec { name = "tonic-reflection"; packageId = "tonic-reflection"; } + { + name = "tracing"; + packageId = "tracing"; + } ]; buildDependencies = [ { @@ -7421,7 +7405,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; } { name = "json-patch"; @@ -8333,9 +8317,9 @@ rec { }; "tonic" = rec { crateName = "tonic"; - version = "0.12.2"; + version = "0.12.3"; edition = "2021"; - sha256 = "1bc8m8r7ysgkb7mhs3b3mvivd43nwaix6qnqhfp5hb2bkscbmxn6"; + sha256 = "0ljd1lfjpw0vrm5wbv15x6nq2i38llsanls5rkzmdn2n0wrmnz47"; authors = [ "Lucio Franco " ]; @@ -8462,7 +8446,7 @@ rec { } ]; features = { - "channel" = [ "dep:hyper" "hyper?/client" "dep:hyper-util" "hyper-util?/client-legacy" "dep:tower" "tower?/balance" "tower?/buffer" "tower?/discover" "tower?/limit" "dep:tokio" "tokio?/time" "dep:hyper-timeout" ]; + "channel" = [ "dep:hyper" "hyper?/client" "dep:hyper-util" "hyper-util?/client-legacy" "dep:tower" "tower?/balance" "tower?/buffer" "tower?/discover" "tower?/limit" "tower?/util" "dep:tokio" "tokio?/time" "dep:hyper-timeout" ]; "codegen" = [ "dep:async-trait" ]; "default" = [ "transport" "codegen" "prost" ]; "gzip" = [ "dep:flate2" ]; diff --git a/Cargo.toml b/Cargo.toml index b3561eb0..29bc2bf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ repository = "https://github.com/stackabletech/listener-operator" anyhow = "1.0" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" -futures = { version = "0.3", features = ["compat"] } +futures = { version = "0.3" } h2 = "0.4" libc = "0.2" pin-project = "1.1" @@ -29,6 +29,7 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.12" tonic-build = "0.12" tonic-reflection = "0.12" +tracing = "0.1.40" # [patch."https://github.com/stackabletech/operator-rs.git"] # stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" } diff --git a/rust/csi-grpc/src/lib.rs b/rust/csi-grpc/src/lib.rs index d6adb506..b46312cf 100644 --- a/rust/csi-grpc/src/lib.rs +++ b/rust/csi-grpc/src/lib.rs @@ -1,5 +1,8 @@ //! Include gRPC definition files that have been generated by `build.rs` +// CSI docs don't quite align with Rustdoc conventions +#![allow(clippy::doc_lazy_continuation)] + pub static FILE_DESCRIPTOR_SET_BYTES: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/file_descriptor_set.bin")); diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 701a54b4..cf3e273a 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -27,6 +27,7 @@ serde.workspace = true snafu.workspace = true strum.workspace = true h2.workspace = true +tracing.workspace = true [build-dependencies] built.workspace = true diff --git a/rust/operator-binary/src/csi_server/node.rs b/rust/operator-binary/src/csi_server/node.rs index 4986bca2..5167ab27 100644 --- a/rust/operator-binary/src/csi_server/node.rs +++ b/rust/operator-binary/src/csi_server/node.rs @@ -17,7 +17,10 @@ use std::{fmt::Debug, path::PathBuf}; use tonic::{Request, Response, Status}; use crate::{ - listener_controller::{listener_mounted_pod_label, ListenerMountedPodLabelError}, + listener_controller::{ + listener_mounted_pod_label, listener_persistent_volume_label, ListenerMountedPodLabelError, + ListenerPersistentVolumeLabelError, + }, utils::{error_full_message, node_primary_address}, }; @@ -25,6 +28,8 @@ use super::{tonic_unimplemented, ListenerSelector, ListenerVolumeContext}; const FIELD_MANAGER_SCOPE: &str = "volume"; +pub const NODE_TOPOLOGY_LABEL_HOSTNAME: &str = "listeners.stackable.tech/hostname"; + pub struct ListenerOperatorNode { pub client: stackable_operator::client::Client, pub node_name: String, @@ -55,6 +60,12 @@ enum PublishVolumeError { #[snafu(display("PersistentVolume has no corresponding PersistentVolumeClaim"))] UnclaimedPv, + #[snafu(display("failed to generate {listener}'s PersistentVolume selector"))] + ListenerPvReference { + source: ListenerPersistentVolumeLabelError, + listener: ObjectRef, + }, + #[snafu(display("failed to generate {listener}'s pod selector"))] ListenerPodSelector { source: ListenerMountedPodLabelError, @@ -75,6 +86,12 @@ enum PublishVolumeError { listener: ObjectRef, }, + #[snafu(display("failed to add listener label to {pv}"))] + AddListenerLabelToPv { + source: stackable_operator::client::Error, + pv: ObjectRef, + }, + #[snafu(display("failed to add listener label to {pod}"))] AddListenerLabelToPod { source: stackable_operator::client::Error, @@ -112,9 +129,11 @@ impl From for Status { PublishVolumeError::GetObject { .. } => Status::unavailable(full_msg), PublishVolumeError::UnclaimedPv => Status::unavailable(full_msg), PublishVolumeError::PodHasNoNode { .. } => Status::unavailable(full_msg), + PublishVolumeError::ListenerPvReference { .. } => Status::failed_precondition(full_msg), PublishVolumeError::ListenerPodSelector { .. } => Status::failed_precondition(full_msg), PublishVolumeError::BuildListenerOwnerRef { .. } => Status::unavailable(full_msg), PublishVolumeError::ApplyListener { .. } => Status::unavailable(full_msg), + PublishVolumeError::AddListenerLabelToPv { .. } => Status::unavailable(full_msg), PublishVolumeError::AddListenerLabelToPod { .. } => Status::unavailable(full_msg), PublishVolumeError::NoAddresses { .. } => Status::unavailable(full_msg), PublishVolumeError::PreparePodDir { .. } => Status::internal(full_msg), @@ -155,7 +174,7 @@ impl csi::v1::node_server::Node for ListenerOperatorNode { max_volumes_per_node: i64::MAX, accessible_topology: Some(Topology { segments: [( - "listeners.stackable.tech/hostname".to_string(), + NODE_TOPOLOGY_LABEL_HOSTNAME.to_string(), self.node_name.clone(), )] .into(), @@ -276,6 +295,29 @@ impl csi::v1::node_server::Node for ListenerOperatorNode { } }; + // Add listener label to PV, allowing traffic to be directed based on reservations, rather than which replicas are *currently* active. + // See https://github.com/stackabletech/listener-operator/issues/220 + self.client + .apply_patch( + FIELD_MANAGER_SCOPE, + &pv, + &PersistentVolume { + metadata: ObjectMeta { + labels: Some(listener_persistent_volume_label(&listener).context( + ListenerPvReferenceSnafu { + listener: ObjectRef::from_obj(&listener), + }, + )?), + ..Default::default() + }, + ..Default::default() + }, + ) + .await + .with_context(|_| AddListenerLabelToPvSnafu { + pv: ObjectRef::from_obj(&pv), + })?; + // Add listener label to pod so that traffic can be directed to it self.client // IMPORTANT diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index 27084dfb..43b36398 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -1,6 +1,12 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; -use futures::{future::try_join_all, StreamExt}; +use futures::{ + future::{try_join, try_join_all}, + StreamExt, +}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::meta::OwnerReferenceBuilder, @@ -8,17 +14,21 @@ use stackable_operator::{ AddressType, Listener, ListenerClass, ListenerIngress, ListenerPort, ListenerSpec, ListenerStatus, ServiceType, }, - k8s_openapi::api::core::v1::{Endpoints, Node, Service, ServicePort, ServiceSpec}, + k8s_openapi::{ + api::core::v1::{Endpoints, Node, PersistentVolume, Service, ServicePort, ServiceSpec}, + apimachinery::pkg::apis::meta::v1::LabelSelector, + }, kube::{ api::{DynamicObject, ObjectMeta}, runtime::{controller, reflector::ObjectRef, watcher}, + ResourceExt, }, logging::controller::{report_controller_reconciled, ReconcilerError}, time::Duration, }; use strum::IntoStaticStr; -use crate::utils::node_primary_address; +use crate::{csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME, utils::node_primary_address}; #[cfg(doc)] use stackable_operator::k8s_openapi::api::core::v1::Pod; @@ -31,6 +41,22 @@ pub async fn run(client: stackable_operator::client::Client) { let listener_store = controller.store(); controller .owns(client.get_all_api::(), watcher::Config::default()) + .watches( + client.get_all_api::(), + watcher::Config::default(), + { + let listener_store = listener_store.clone(); + move |listenerclass| { + listener_store + .state() + .into_iter() + .filter(move |listener| { + listener.spec.class_name == listenerclass.metadata.name + }) + .map(|l| ObjectRef::from_obj(&*l)) + } + }, + ) .watches( client.get_all_api::(), watcher::Config::default(), @@ -48,6 +74,17 @@ pub async fn run(client: stackable_operator::client::Client) { .map(|l| ObjectRef::from_obj(&*l)) }, ) + .watches( + client.get_all_api::(), + watcher::Config::default(), + |pv| { + let labels = pv.labels(); + labels + .get(PV_LABEL_LISTENER_NAMESPACE) + .zip(labels.get(PV_LABEL_LISTENER_NAME)) + .map(|(ns, name)| ObjectRef::::new(name).within(ns)) + }, + ) .shutdown_on_signal() .run( reconcile, @@ -71,28 +108,45 @@ pub struct Ctx { pub enum Error { #[snafu(display("object has no namespace"))] NoNs, + #[snafu(display("object has no name"))] NoName, + #[snafu(display("object has no ListenerClass (.spec.class_name)"))] NoListenerClass, - #[snafu(display("failed to generate listener's pod selector"))] + + #[snafu(display("failed to generate Listener's PersistentVolume selector"))] + ListenerPvSelector { + source: ListenerPersistentVolumeLabelError, + }, + + #[snafu(display("failed to generate Listener's Pod selector"))] ListenerPodSelector { source: ListenerMountedPodLabelError, }, + + #[snafu(display("failed to get PersistentVolumes for Listener"))] + GetListenerPvs { + source: stackable_operator::client::Error, + }, + #[snafu(display("failed to get {obj}"))] GetObject { source: stackable_operator::client::Error, obj: ObjectRef, }, + #[snafu(display("failed to build owner reference to Listener"))] BuildListenerOwnerRef { source: stackable_operator::builder::meta::Error, }, + #[snafu(display("failed to apply {svc}"))] ApplyService { source: stackable_operator::client::Error, svc: ObjectRef, }, + #[snafu(display("failed to apply status for Listener"))] ApplyStatus { source: stackable_operator::client::Error, @@ -109,7 +163,9 @@ impl ReconcilerError for Error { Self::NoNs => None, Self::NoName => None, Self::NoListenerClass => None, + Self::ListenerPvSelector { source: _ } => None, Self::ListenerPodSelector { source: _ } => None, + Self::GetListenerPvs { source: _ } => None, Self::GetObject { source: _, obj } => Some(obj.clone()), Self::BuildListenerOwnerRef { .. } => None, Self::ApplyService { source: _, svc } => Some(svc.clone().erase()), @@ -217,23 +273,8 @@ pub async fn reconcile(listener: Arc, ctx: Arc) -> Result; match listener_class.spec.service_type { ServiceType::NodePort => { - let endpoints = ctx - .client - .get_opt::(&svc_name, ns) - .await - .with_context(|_| GetObjectSnafu { - obj: ObjectRef::::new(&svc_name).within(ns).erase(), - })? - // Endpoints object may not yet be created by its respective controller - .unwrap_or_default(); - let node_names = endpoints - .subsets - .into_iter() - .flatten() - .flat_map(|subset| subset.addresses) - .flatten() - .flat_map(|addr| addr.node_name) - .collect::>(); + let node_names = + node_names_for_nodeport_listener(&ctx.client, &listener, ns, &svc_name).await?; nodes = try_join_all(node_names.iter().map(|node_name| async { ctx.client .get::(node_name, &()) @@ -334,6 +375,81 @@ pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> controll controller::Action::requeue(*Duration::from_secs(5)) } +/// Lists the names of the [`Node`]s backing this [`Listener`]. +/// +/// Should only be used for [`NodePort`](`ServiceType::NodePort`) [`Listener`]s. +async fn node_names_for_nodeport_listener( + client: &stackable_operator::client::Client, + listener: &Listener, + namespace: &str, + service_name: &str, +) -> Result> { + let (pvs, endpoints) = try_join( + async { + client + .list_with_label_selector::( + &(), + &LabelSelector { + match_labels: Some(listener_persistent_volume_label(listener).unwrap()), + ..Default::default() + }, + ) + .await + .context(GetListenerPvsSnafu) + }, + async { + client + // Endpoints object may not yet be created by its respective controller + .get_opt::(service_name, namespace) + .await + .with_context(|_| GetObjectSnafu { + obj: ObjectRef::::new(service_name) + .within(namespace) + .erase(), + }) + }, + ) + .await?; + + let pv_node_names = pvs + .into_iter() + .filter_map(|pv| pv.spec?.node_affinity?.required) + .flat_map(|affinity| affinity.node_selector_terms) + .filter_map(|terms| terms.match_expressions) + .flatten() + .filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In") + .filter_map(|expr| expr.values) + .flatten() + .collect::>(); + + // Old objects that haven't been mounted before the PV lookup mechanism was added will + // not have the correct labels, so we also look up using Endpoints. + let endpoints_node_names = endpoints + .into_iter() + .filter_map(|endpoints| endpoints.subsets) + .flatten() + .flat_map(|subset| subset.addresses) + .flatten() + .flat_map(|addr| addr.node_name) + .collect::>(); + + let node_names_missing_from_pv = endpoints_node_names + .difference(&pv_node_names) + .collect::>(); + if !node_names_missing_from_pv.is_empty() { + tracing::warn!( + ?node_names_missing_from_pv, + "some backing Nodes could only be found via legacy Endpoints discovery method, \ + this may cause discovery config to be unstable \ + (hint: try restarting the Pods backing this Listener)", + ); + } + + let mut node_names = pv_node_names; + node_names.extend(endpoints_node_names); + Ok(node_names) +} + #[derive(Snafu, Debug)] #[snafu(module)] pub enum ListenerMountedPodLabelError { @@ -356,8 +472,44 @@ pub fn listener_mounted_pod_label( // 60. // We prefer uid over name because uids have a consistent length. Ok(( + // This should probably have been listeners.stackable.tech/ instead, but too late to change now format!("listener.stackable.tech/mnt.{}", uid.replace('-', "")), // Arbitrary, but (hopefully) helps indicate to users which listener it applies to listener.metadata.name.clone().context(NoNameSnafu)?, )) } + +#[derive(Snafu, Debug)] +#[snafu(module)] +pub enum ListenerPersistentVolumeLabelError { + #[snafu(display("object has no name"))] + NoName, + + #[snafu(display("object has no namespace"))] + NoNamespace, +} + +const PV_LABEL_LISTENER_NAMESPACE: &str = "listeners.stackable.tech/listener-namespace"; +const PV_LABEL_LISTENER_NAME: &str = "listeners.stackable.tech/listener-name"; + +/// A label that identifies which [`Listener`] corresponds to a given [`PersistentVolume`]. +pub fn listener_persistent_volume_label( + listener: &Listener, +) -> Result, ListenerPersistentVolumeLabelError> { + use listener_persistent_volume_label_error::*; + Ok([ + ( + PV_LABEL_LISTENER_NAMESPACE.to_string(), + listener + .metadata + .namespace + .clone() + .context(NoNamespaceSnafu)?, + ), + ( + PV_LABEL_LISTENER_NAME.to_string(), + listener.metadata.name.clone().context(NoNameSnafu)?, + ), + ] + .into()) +}