Skip to content

Commit

Permalink
compute-client: cancel served peeks
Browse files Browse the repository at this point in the history
This commit makes the compute client explictly send `CancelPeek`
commands for peeks it doesn't need a response for anymore (because it
already saw one). This allows late replicas to clean up any state they
hold for this peek. It will also allow us to simplify peek handling in
the future.
  • Loading branch information
teskje committed Sep 5, 2023
1 parent 52226c7 commit 4c441a4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
26 changes: 13 additions & 13 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,20 +1177,14 @@ where
}
};

// Forward the peek response, if we didn't already forward a response
// to this peek previously. If the peek is targeting a replica, only
// forward the response from that replica.
// TODO: we could collect the other responses to assert equivalence?
// Trades resources (memory) for reassurances; idk which is best.
// Forward the peek response, if we didn't already forward a response to this peek
// previously. If the peek is targeting a replica, only forward the response from that
// replica.
//
// NOTE: we use the `otel_ctx` from the response, not the
// pending peek, because we currently want the parent
// to be whatever the compute worker did with this peek. We
// still `take` the pending peek's `otel_ctx` to mark it as
// served.
//
// Additionally, we just use the `otel_ctx` from the first worker to
// respond.
// NOTE: We use the `otel_ctx` from the response, not the pending peek, because we
// currently want the parent to be whatever the compute worker did with this peek. We still
// `take` the pending peek's `otel_ctx` to mark it as served.

let replica_targeted = peek.target_replica.unwrap_or(replica_id) == replica_id;
let controller_response = if replica_targeted && peek.otel_ctx.take().is_some() {
let duration = peek.requested_at.elapsed();
Expand All @@ -1211,6 +1205,12 @@ where
self.remove_peeks(&[uuid].into());
}

// If we are serving a response to the peek, enqueue a `CancelPeek` command to allow other
// replicas to stop spending resources on computing this peek.
if controller_response.is_some() {
self.compute.send(ComputeCommand::CancelPeek { uuid });
}

controller_response
}

Expand Down
2 changes: 1 addition & 1 deletion test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,7 +2115,7 @@ def fetch_metrics() -> Metrics:
count = metrics.get_command_count("peek")
assert count <= 2, f"unexpected peek count: {count}"
count = metrics.get_command_count("cancel_peek")
assert count == 0, f"unexpected cancel_peek count: {count}"
assert count <= 2, f"unexpected cancel_peek count: {count}"
count = metrics.get_command_count("initialization_complete")
assert count == 0, f"unexpected initialization_complete count: {count}"
count = metrics.get_command_count("update_configuration")
Expand Down

0 comments on commit 4c441a4

Please sign in to comment.