Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close connections which violate policy after updates #772

Merged
merged 21 commits into from
Feb 5, 2024

Conversation

ilrudie
Copy link
Contributor

@ilrudie ilrudie commented Jan 16, 2024

related #311

Adds:

  • tracking open connections for inbound and inbound_passthrough
  • notifications to trackers when auth policy changes occur
  • separate task handles change notifications, asserts policy against the existing connections and closes open connections which are now denied by policy

@ilrudie ilrudie requested review from a team as code owners January 16, 2024 22:15
@istio-testing istio-testing added do-not-merge/work-in-progress Block merging of a PR because it isn't ready yet. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Jan 16, 2024
Copy link
Member

@howardjohn howardjohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly skimmed it so far but looks reasonable overall

pub async fn drain(&self, c: &Connection) {
match self.drains.clone().write().await.remove(c) {
Some(cd) => {
cd.tx.drain().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the lock held here? If so, is drain() possibly blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the write lock should be held here. Remove totally removes a value from the HashMap returning the owned value (not a reference) so there's no writing beyond the single remove and no reference to the lock required. Now you've got me doubting that understanding a bit though 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely possible to make that more explicit though if we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        let w = self.drains.clone().write().await;
        let cd = w.remove(c);
        drop(w);

        match cd {
            ...
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock should be released when the value returned from write() is dropped. I am just not sure if its drops when the match body (L67) is done or when the match check (L65) is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better break it out like that to align with the standards we have for zt... prioritize understandability for our reviewers over the most idiomatic looking rustlang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out the lock is held through the duration of the match. This lead to a deadlock in the track method where the read lock was being held for the entire match and one branch was trying to get a write lock.

Copy link
Contributor

@bleggett bleggett Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of a central RWLocked connection tracker pretty much means by definition that we will be blocking on add/remove ops, I think.

If that's not acceptable, inverting this via channels and ditching the centralized locked conntrack is probably the only other way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also explore using linked lists for less locking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 116 to 117
if !state.assert_rbac(&conn).await {
connection_manager.drain(&conn).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely needs some nice logging since it will be a pretty major "WTF just happened" in some cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firm agree. TY

Comment on lines 76 to 86
pub async fn connections(&self) -> Vec<Connection> {
// potentially large copy under read lock, could require optomization
self.drains.read().await.keys().cloned().collect()
}
Copy link
Contributor

@bleggett bleggett Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do the rbac check here to avoid copying the entire conntrack, returning it, and checking in the caller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider that but I was a little worried about interlocking. We'd need to await locking on the PolicyStore (likely multiple times) inside this lock if we assert_rbac in here I think.

Copy link
Contributor

@bleggett bleggett Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have to lock state multiple times, once we get a change event we can (at least) clone it and provide it to the conn_manager to apply in an iterative loop.

In general if we go with this approach I think eventually assert_rbac should live in connection_manager.rs and not state, anyway, and we can probably avoid cloning the entire state to do what we want.

But, again, this is getting a bit in the weeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fair. we could do some reworking of how policy is applied and retained to ease or eliminate interlocking problems if need be. My thinking was sort of limited to not making those kind changes which maybe is unnecessarily narrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now. If the centralized rwlock'd conntrack becomes a problem/bottleneck, we can just remove it.

@istio-testing istio-testing added the needs-rebase Indicates a PR needs to be rebased before being merged label Jan 18, 2024
@istio-testing istio-testing added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed needs-rebase Indicates a PR needs to be rebased before being merged size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Jan 18, 2024
rx
}

pub async fn drain(&self, c: &Connection) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a graceful draining time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still TBD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are really 2 behaviors we want here:

  1. drain: used when ztunnel is shutting down. sends goaway and continues to handle traffic
  2. close: used when auth policy changes and existing connections become denied. stop handling traffic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@howardjohn and @kyessenov, does this approach seem reasonable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any drain here. This is just local close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it sits now even when policy changes from allow to deny we send a goaway and continue to handle traffic. We'd originally implemented drains in this way because our only connection drains were when ztunnel was being shutdown and in that case we decided that k8s killing the pod would be our drain timeout.

From an auth deny perspective this didn't seem correct though. I'm just about ready to push some new code which has both drain (for shutdown) and close (for auth) implemented for inbound.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not conflate HBONE goaway with application drain. HBONE goaway is an infastructure mechanism and should not be used for application intent - it's like reinstalling a cluster when redeploying an app.

For application policies, there's no drain or GOAWAY here AFAICT. You should just close the connection.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By close, I mean RST. I think we shouldn't do half close here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commits proxy drain behavior is unchanged and all related tests are passing but during testing auth deny closes are (basically) immediate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that is the desired behavior on drain on upgrade/shutdown vs immediate close on authz policy denies.

@ilrudie
Copy link
Contributor Author

ilrudie commented Jan 24, 2024

/retest

…ager, unit testing around the same

Signed-off-by: ilrudie <[email protected]>
@istio-testing istio-testing removed the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jan 24, 2024
@ilrudie
Copy link
Contributor Author

ilrudie commented Jan 29, 2024

@howardjohn, @hzxuzhonghu and @kyessenov; if you get a chance to take another look and provide feedback that would be greatly appreciated. TYIA

_ = policies_changed.changed() => {
let connections = connection_manager.connections().await;
for conn in connections {
if !state.assert_rbac(&conn).await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not a Rust expert - does this await individually or globally? Any time there's IO, you probably don't want to stall in a loop in a switch - you may overflow the pending events because IO is slow. It's best to fire-and-forget and then let someone reap the zombies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will await the individual assertion against the one connection but await doesn't mean block (there is a specific block function if that's what you want though). If the Future being awaited is not ready it relinquishes control and will be polled later by the executor of the task/thread.

Rusts Futures are somewhat unique in that they are lazy. If nothing drives them to completion they will not do any work so we can't totally fire and forget. We can either await them and let the executor/callback model poll them or we could collect the all futures and join them later. If we do neither no work would happen though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you do want to block the loop or at least debounce it to avoid edge-trigerred event spam in the runtime. But I have no idea how to express that in Rust. Maybe a separate actor that is "fast" in consuming policy events but "slow" in enacting them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think having scoped events would be a solid optimization. IMO it would be better to keep this PR a little smaller and then iterate on some optimizations as we move forward but if folks strongly disagree we can start optimizing now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code you are commenting on is sort of that actor (in concept at least). It's being spun up in its own task so it should be (at least somewhat) independent of the tasks which are actually moving data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the event spam, is the concern is we enqueue multiple times to trigger this code? Like while we are looping here, we get changed() multiple times stacked up?

If so, I think it will have at most one pending. That being said, you could still hit some spam I guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the implementation of sending to policy subscribers to be less frequent in cases where we've got batched updates. Should help

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be a race between connection tracked and policy update.
like:

t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked

the connection actually should be closed

@ilrudie
Copy link
Contributor Author

ilrudie commented Jan 30, 2024

cc @stevenctl

@ilrudie ilrudie changed the title drain connections which violate policy after updates close connections which violate policy after updates Jan 30, 2024
@@ -42,6 +42,21 @@ pub enum Identity {
},
}

impl Ord for Identity {
// Not sure this is a super legit compare but I think it should work for POC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this is no long POC should we do a more robust approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I do at want to consider if this is adequate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using something like this would force us to consider how Identity::X should be compared to Identity::Spiffe if/when a new identity format is added.

impl Ord for Identity {
    fn cmp(&self, other: &Self) -> Ordering {
        let s = match self {
            Identity::Spiffe {
                trust_domain,
                namespace,
                service_account,
            } => trust_domain.to_owned() + namespace + service_account,
        };
        let o = match other {
            Identity::Spiffe {
                trust_domain,
                namespace,
                service_account,
            } => trust_domain.to_owned() + namespace + service_account,
        };
        s.cmp(&o)
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we actually need ord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm, they can be derived... that's the play

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ord is required to sort which is used in the testing.

let cd = ConnectionDrain::new();
let rx = cd.rx.clone();
let mut drains = self.drains.write().await;
if let Some(w) = drains.remove(c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: del+mutate+insert can be done with code like:

map.entry("poneyland")
   .and_modify(|e| { *e += 1 })
   .or_insert(42);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not with the return rx aspect though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified rust playground example

Seems like it could work. TY

Copy link
Contributor Author

@ilrudie ilrudie Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could very well be a step too far and difficult to grok but:

    // register a connection with the manager and get a channel to receive on
    pub async fn track(self, c: &Connection) -> drain::Watch {
        self.drains
            .write()
            .await
            .entry(c.to_owned())
            .and_modify(|cd| cd.count += 1)
            .or_insert(ConnectionDrain::new())
            .rx
            .clone()
    }

}

#[derive(Clone)]
pub struct ConnectionManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a "pool" tracking connections. It seems like there, ideally, would be a single data structure tracking connections. I guess Pool tracks outer HBONE connections while this tracks inner user connections, though. Also I don't see a feasible way to use 1 structure anyways, so mostly hypothetical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good thought. I couldn't work out how to do it either though.

pub async fn release(self, c: &Connection) {
let mut drains = self.drains.write().await;
if let Some((k, v)) = drains.remove_entry(c) {
if v.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why do we need atomic operations if we have a write lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. I was gunna try something a little fancier and ended going with this without removing the atomic. A lot of this logic may be just removed though if we want to switch off the drain crate onto a different type of channel as Ben suggested elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be better now. Atomic removed

}
}

// signal all connections listening to this channel to take action (typically terminate traffic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment seems confusing, or I don't understand it. Signal all connections? But the input is 1 connection.

Is Connection representing the outer hbone connection, and drain() waits until all inner connects close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially a single rbac::connection could have multiple connections handling traffic I think. In that case they all need to be closed and this code should handle the case if it arises.

_ = policies_changed.changed() => {
let connections = connection_manager.connections().await;
for conn in connections {
if !state.assert_rbac(&conn).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the event spam, is the concern is we enqueue multiple times to trigger this code? Like while we are looping here, we get changed() multiple times stacked up?

If so, I think it will have at most one pending. That being said, you could still hit some spam I guess

@@ -57,12 +79,14 @@ impl PolicyStore {
RbacScope::WorkloadSelector => {}
}
self.by_key.insert(key, rbac);
self.notifier.send();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we want to send at most 1 per xds push. That may modify many policies at once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to look to a follow up which adds batching and includes putting the scope onto the channel so that the receivers have less spam and can inspect the scope to determine if they need to re-assert. If we think it's critical path I could include it but if not critical perhaps a TODO would be better so it's not just in my head.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an impl for your comment. Not scoped but should reduce the number of notifications sent to policy subscribers.

@istio-testing istio-testing added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jan 30, 2024

// signal all connections listening to this channel to take action (typically terminate traffic)
async fn close(&self, c: &Connection) {
if let Some(cd) = self.drains.clone().write().await.remove(c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove clone

_ = policies_changed.changed() => {
let connections = connection_manager.connections().await;
for conn in connections {
if !state.assert_rbac(&conn).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be a race between connection tracked and policy update.
like:

t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked

the connection actually should be closed

Signed-off-by: ilrudie <[email protected]>
@ilrudie
Copy link
Contributor Author

ilrudie commented Jan 31, 2024

There can be a race between connection tracked and policy update.
like:

t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked

the connection actually should be closed

Good catch @hzxuzhonghu. Track used to be called with assert_rbac but the design changed a little and now there is a significant gap for races to occur. I'll have take a look at this in more detail.

@istio-testing istio-testing added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Feb 2, 2024
@ilrudie ilrudie added the do-not-merge/hold Block automatic merging of a PR. label Feb 2, 2024
@ilrudie
Copy link
Contributor Author

ilrudie commented Feb 2, 2024

added hold to consider reworking without drain::Watch and using a tokio channel instead

Copy link
Member

@hzxuzhonghu hzxuzhonghu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests should be added, could in a follow up

@ilrudie
Copy link
Contributor Author

ilrudie commented Feb 2, 2024

/retest

@ilrudie
Copy link
Contributor Author

ilrudie commented Feb 5, 2024

Thanks @hzxuzhonghu, I've created an issue to track on follow up items (#798) but will stop making changes to this PR.

@ilrudie ilrudie removed the do-not-merge/hold Block automatic merging of a PR. label Feb 5, 2024
@istio-testing istio-testing merged commit b95e5dd into istio:master Feb 5, 2024
3 checks passed
@supercharge-xsy
Copy link

not familiar with Rust,have a question:If the connection is closed properly after send some packets, can the management program sense it? I don't seem to see the corresponding processing. @ilrudie

@ilrudie
Copy link
Contributor Author

ilrudie commented Mar 14, 2024

not familiar with Rust,have a question:If the connection is closed properly after send some packets, can the management program sense it? I don't seem to see the corresponding processing. @ilrudie

It's not checking the connections themselves but we are presently using the drain crate which waits for the handle(s) to be dropped when you signal. The handle drop should occur once we break out of looping on handling traffic for the connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants