-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
close connections which violate policy after updates #772
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly skimmed it so far but looks reasonable overall
src/proxy/connection_manager.rs
Outdated
pub async fn drain(&self, c: &Connection) { | ||
match self.drains.clone().write().await.remove(c) { | ||
Some(cd) => { | ||
cd.tx.drain().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the lock held here? If so, is drain() possibly blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the write lock should be held here. Remove totally removes a value from the HashMap returning the owned value (not a reference) so there's no writing beyond the single remove and no reference to the lock required. Now you've got me doubting that understanding a bit though 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely possible to make that more explicit though if we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let w = self.drains.clone().write().await;
let cd = w.remove(c);
drop(w);
match cd {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock should be released when the value returned from write()
is dropped. I am just not sure if its drops when the match body (L67) is done or when the match check (L65) is done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably better break it out like that to align with the standards we have for zt... prioritize understandability for our reviewers over the most idiomatic looking rustlang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out the lock is held through the duration of the match. This lead to a deadlock in the track method where the read lock was being held for the entire match and one branch was trying to get a write lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of a central RWLocked connection tracker pretty much means by definition that we will be blocking on add/remove ops, I think.
If that's not acceptable, inverting this via channels and ditching the centralized locked conntrack is probably the only other way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also explore using linked lists for less locking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/proxy/inbound.rs
Outdated
if !state.assert_rbac(&conn).await { | ||
connection_manager.drain(&conn).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely needs some nice logging since it will be a pretty major "WTF just happened" in some cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
firm agree. TY
src/proxy/connection_manager.rs
Outdated
pub async fn connections(&self) -> Vec<Connection> { | ||
// potentially large copy under read lock, could require optomization | ||
self.drains.read().await.keys().cloned().collect() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just do the rbac check here to avoid copying the entire conntrack, returning it, and checking in the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider that but I was a little worried about interlocking. We'd need to await locking on the PolicyStore (likely multiple times) inside this lock if we assert_rbac in here I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't have to lock state multiple times, once we get a change event we can (at least) clone it and provide it to the conn_manager to apply in an iterative loop.
In general if we go with this approach I think eventually assert_rbac
should live in connection_manager.rs
and not state
, anyway, and we can probably avoid cloning the entire state to do what we want.
But, again, this is getting a bit in the weeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fair. we could do some reworking of how policy is applied and retained to ease or eliminate interlocking problems if need be. My thinking was sort of limited to not making those kind changes which maybe is unnecessarily narrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for now. If the centralized rwlock'd conntrack becomes a problem/bottleneck, we can just remove it.
src/proxy/connection_manager.rs
Outdated
rx | ||
} | ||
|
||
pub async fn drain(&self, c: &Connection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a graceful draining time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's still TBD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are really 2 behaviors we want here:
- drain: used when ztunnel is shutting down. sends goaway and continues to handle traffic
- close: used when auth policy changes and existing connections become denied. stop handling traffic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@howardjohn and @kyessenov, does this approach seem reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any drain here. This is just local close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it sits now even when policy changes from allow to deny we send a goaway and continue to handle traffic. We'd originally implemented drains in this way because our only connection drains were when ztunnel was being shutdown and in that case we decided that k8s killing the pod would be our drain timeout.
From an auth deny perspective this didn't seem correct though. I'm just about ready to push some new code which has both drain (for shutdown) and close (for auth) implemented for inbound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not conflate HBONE goaway with application drain. HBONE goaway is an infastructure mechanism and should not be used for application intent - it's like reinstalling a cluster when redeploying an app.
For application policies, there's no drain or GOAWAY here AFAICT. You should just close the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By close, I mean RST. I think we shouldn't do half close here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest commits proxy drain behavior is unchanged and all related tests are passing but during testing auth deny closes are (basically) immediate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that is the desired behavior on drain on upgrade/shutdown vs immediate close on authz policy denies.
/retest |
…changes Signed-off-by: Ian Rudie <[email protected]>
Signed-off-by: Ian Rudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
… which asserts policy against running connections Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
…ager tests Signed-off-by: ilrudie <[email protected]>
…ager, unit testing around the same Signed-off-by: ilrudie <[email protected]>
@howardjohn, @hzxuzhonghu and @kyessenov; if you get a chance to take another look and provide feedback that would be greatly appreciated. TYIA |
_ = policies_changed.changed() => { | ||
let connections = connection_manager.connections().await; | ||
for conn in connections { | ||
if !state.assert_rbac(&conn).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not a Rust expert - does this await individually or globally? Any time there's IO, you probably don't want to stall in a loop in a switch - you may overflow the pending events because IO is slow. It's best to fire-and-forget and then let someone reap the zombies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will await the individual assertion against the one connection but await doesn't mean block (there is a specific block function if that's what you want though). If the Future being awaited is not ready it relinquishes control and will be polled later by the executor of the task/thread.
Rusts Futures are somewhat unique in that they are lazy. If nothing drives them to completion they will not do any work so we can't totally fire and forget. We can either await them and let the executor/callback model poll them or we could collect the all futures and join them later. If we do neither no work would happen though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like you do want to block the loop or at least debounce it to avoid edge-trigerred event spam in the runtime. But I have no idea how to express that in Rust. Maybe a separate actor that is "fast" in consuming policy events but "slow" in enacting them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think having scoped events would be a solid optimization. IMO it would be better to keep this PR a little smaller and then iterate on some optimizations as we move forward but if folks strongly disagree we can start optimizing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code you are commenting on is sort of that actor (in concept at least). It's being spun up in its own task so it should be (at least somewhat) independent of the tasks which are actually moving data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the event spam, is the concern is we enqueue multiple times to trigger this code? Like while we are looping here, we get changed()
multiple times stacked up?
If so, I think it will have at most one pending. That being said, you could still hit some spam I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the implementation of sending to policy subscribers to be less frequent in cases where we've got batched updates. Should help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be a race between connection tracked and policy update.
like:
t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked
the connection actually should be closed
cc @stevenctl |
src/identity/manager.rs
Outdated
@@ -42,6 +42,21 @@ pub enum Identity { | |||
}, | |||
} | |||
|
|||
impl Ord for Identity { | |||
// Not sure this is a super legit compare but I think it should work for POC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given this is no long POC should we do a more robust approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I do at want to consider if this is adequate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using something like this would force us to consider how Identity::X should be compared to Identity::Spiffe if/when a new identity format is added.
impl Ord for Identity {
fn cmp(&self, other: &Self) -> Ordering {
let s = match self {
Identity::Spiffe {
trust_domain,
namespace,
service_account,
} => trust_domain.to_owned() + namespace + service_account,
};
let o = match other {
Identity::Spiffe {
trust_domain,
namespace,
service_account,
} => trust_domain.to_owned() + namespace + service_account,
};
s.cmp(&o)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we actually need ord?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nm, they can be derived... that's the play
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ord is required to sort which is used in the testing.
src/proxy/connection_manager.rs
Outdated
let cd = ConnectionDrain::new(); | ||
let rx = cd.rx.clone(); | ||
let mut drains = self.drains.write().await; | ||
if let Some(w) = drains.remove(c) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: del+mutate+insert can be done with code like:
map.entry("poneyland")
.and_modify(|e| { *e += 1 })
.or_insert(42);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe not with the return rx
aspect though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modified rust playground example
Seems like it could work. TY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could very well be a step too far and difficult to grok but:
// register a connection with the manager and get a channel to receive on
pub async fn track(self, c: &Connection) -> drain::Watch {
self.drains
.write()
.await
.entry(c.to_owned())
.and_modify(|cd| cd.count += 1)
.or_insert(ConnectionDrain::new())
.rx
.clone()
}
} | ||
|
||
#[derive(Clone)] | ||
pub struct ConnectionManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a "pool" tracking connections. It seems like there, ideally, would be a single data structure tracking connections. I guess Pool tracks outer HBONE connections while this tracks inner user connections, though. Also I don't see a feasible way to use 1 structure anyways, so mostly hypothetical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good thought. I couldn't work out how to do it either though.
src/proxy/connection_manager.rs
Outdated
pub async fn release(self, c: &Connection) { | ||
let mut drains = self.drains.write().await; | ||
if let Some((k, v)) = drains.remove_entry(c) { | ||
if v.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why do we need atomic operations if we have a write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct. I was gunna try something a little fancier and ended going with this without removing the atomic. A lot of this logic may be just removed though if we want to switch off the drain crate onto a different type of channel as Ben suggested elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be better now. Atomic removed
} | ||
} | ||
|
||
// signal all connections listening to this channel to take action (typically terminate traffic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment seems confusing, or I don't understand it. Signal all connections? But the input is 1 connection.
Is Connection
representing the outer hbone connection, and drain()
waits until all inner connects close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially a single rbac::connection could have multiple connections handling traffic I think. In that case they all need to be closed and this code should handle the case if it arises.
_ = policies_changed.changed() => { | ||
let connections = connection_manager.connections().await; | ||
for conn in connections { | ||
if !state.assert_rbac(&conn).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the event spam, is the concern is we enqueue multiple times to trigger this code? Like while we are looping here, we get changed()
multiple times stacked up?
If so, I think it will have at most one pending. That being said, you could still hit some spam I guess
src/state/policy.rs
Outdated
@@ -57,12 +79,14 @@ impl PolicyStore { | |||
RbacScope::WorkloadSelector => {} | |||
} | |||
self.by_key.insert(key, rbac); | |||
self.notifier.send(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we want to send at most 1 per xds push. That may modify many policies at once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to look to a follow up which adds batching and includes putting the scope onto the channel so that the receivers have less spam and can inspect the scope to determine if they need to re-assert. If we think it's critical path I could include it but if not critical perhaps a TODO would be better so it's not just in my head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added an impl for your comment. Not scoped but should reduce the number of notifications sent to policy subscribers.
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
src/proxy/connection_manager.rs
Outdated
|
||
// signal all connections listening to this channel to take action (typically terminate traffic) | ||
async fn close(&self, c: &Connection) { | ||
if let Some(cd) = self.drains.clone().write().await.remove(c) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove clone
_ = policies_changed.changed() => { | ||
let connections = connection_manager.connections().await; | ||
for conn in connections { | ||
if !state.assert_rbac(&conn).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be a race between connection tracked and policy update.
like:
t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked
the connection actually should be closed
Signed-off-by: ilrudie <[email protected]>
Good catch @hzxuzhonghu. Track used to be called with assert_rbac but the design changed a little and now there is a significant gap for races to occur. I'll have take a look at this in more detail. |
Signed-off-by: ilrudie <[email protected]>
Signed-off-by: ilrudie <[email protected]>
added hold to consider reworking without drain::Watch and using a tokio channel instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests should be added, could in a follow up
Signed-off-by: ilrudie <[email protected]>
/retest |
Thanks @hzxuzhonghu, I've created an issue to track on follow up items (#798) but will stop making changes to this PR. |
not familiar with Rust,have a question:If the connection is closed properly after send some packets, can the management program sense it? I don't seem to see the corresponding processing. @ilrudie |
It's not checking the connections themselves but we are presently using the drain crate which waits for the handle(s) to be dropped when you signal. The handle drop should occur once we break out of looping on handling traffic for the connection. |
related #311
Adds:
inbound
andinbound_passthrough