Skip to content

Commit 5f19f3a

Browse files
committed
[AdminAPI] Use IngressClient for invocation and state mgmt
- Use IngressClient instead of bifrost to write to partitions logs - Remove deprecated `delete_invocation`
1 parent 83eae93 commit 5f19f3a

File tree

14 files changed

+123
-187
lines changed

14 files changed

+123
-187
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/admin/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ restate-admin-rest-model = { workspace = true, features = ["schema"] }
2222
restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] }
2323
restate-core = { workspace = true }
2424
restate-errors = { workspace = true }
25+
restate-ingress-client = { workspace = true }
2526
restate-metadata-store = { workspace = true }
2627
restate-service-client = { workspace = true }
2728
restate-service-protocol = { workspace = true, features = ["discovery"] }

crates/admin/src/rest_api/deployments.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ use serde::Deserialize;
5858
from_type = "MetaApiError",
5959
)
6060
)]
61-
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations>(
62-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
61+
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
62+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
6363
Extension(version): Extension<AdminApiVersion>,
6464
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
6565
) -> Result<impl IntoResponse, MetaApiError>
@@ -188,8 +188,8 @@ where
188188
schema = "std::string::String"
189189
))
190190
)]
191-
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations>(
192-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
191+
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
192+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
193193
Path(deployment_id): Path<DeploymentId>,
194194
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError>
195195
where
@@ -210,8 +210,8 @@ where
210210
operation_id = "list_deployments",
211211
tags = "deployment"
212212
)]
213-
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations>(
214-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
213+
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations, Transport>(
214+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
215215
) -> Json<ListDeploymentsResponse>
216216
where
217217
Metadata: MetadataService,
@@ -267,8 +267,8 @@ pub struct DeleteDeploymentParams {
267267
from_type = "MetaApiError",
268268
)
269269
)]
270-
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations>(
271-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
270+
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
271+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
272272
Path(deployment_id): Path<DeploymentId>,
273273
Query(DeleteDeploymentParams { force }): Query<DeleteDeploymentParams>,
274274
) -> Result<StatusCode, MetaApiError>
@@ -302,8 +302,8 @@ where
302302
schema = "std::string::String"
303303
))
304304
)]
305-
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations>(
306-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
305+
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
306+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
307307
Extension(version): Extension<AdminApiVersion>,
308308
method: Method,
309309
Path(deployment_id): Path<DeploymentId>,

crates/admin/src/rest_api/handlers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use restate_types::schema::service::HandlerMetadata;
3030
schema = "std::string::String"
3131
))
3232
)]
33-
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations>(
34-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
33+
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations, Transport>(
34+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
3535
Path(service_name): Path<String>,
3636
) -> Result<Json<ListServiceHandlersResponse>, MetaApiError>
3737
where
@@ -62,8 +62,8 @@ where
6262
)
6363
)
6464
)]
65-
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations>(
66-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
65+
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations, Transport>(
66+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
6767
Path((service_name, handler_name)): Path<(String, String)>,
6868
) -> Result<Json<HandlerMetadata>, MetaApiError>
6969
where

crates/admin/src/rest_api/invocations.rs

Lines changed: 20 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -8,121 +8,23 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use super::error::*;
12-
use crate::generate_meta_api_error;
13-
use crate::rest_api::create_envelope_header;
14-
use crate::state::AdminServiceState;
1511
use axum::Json;
1612
use axum::extract::{Path, Query, State};
1713
use axum::http::StatusCode;
1814
use okapi_operation::*;
15+
use serde::Deserialize;
16+
1917
use restate_admin_rest_model::invocations::RestartAsNewInvocationResponse;
20-
use restate_types::identifiers::{
21-
DeploymentId, InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey,
22-
};
18+
use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId};
2319
use restate_types::invocation::client::{
2420
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
2521
PauseInvocationResponse, PurgeInvocationResponse, ResumeInvocationResponse,
2622
};
27-
use restate_types::invocation::{InvocationTermination, PurgeInvocationRequest, TerminationFlavor};
2823
use restate_types::journal_v2::EntryIndex;
29-
use restate_wal_protocol::{Command, Envelope};
30-
use serde::Deserialize;
31-
use std::sync::Arc;
32-
use tracing::warn;
3324

34-
#[derive(Debug, Default, Deserialize, JsonSchema)]
35-
pub enum DeletionMode {
36-
#[default]
37-
#[serde(alias = "cancel")]
38-
Cancel,
39-
#[serde(alias = "kill")]
40-
Kill,
41-
#[serde(alias = "purge")]
42-
Purge,
43-
}
44-
#[derive(Debug, Default, Deserialize, JsonSchema)]
45-
pub struct DeleteInvocationParams {
46-
pub mode: Option<DeletionMode>,
47-
}
48-
49-
/// Terminate an invocation
50-
#[openapi(
51-
summary = "Delete an invocation",
52-
deprecated = true,
53-
description = "Use kill_invocation/cancel_invocation/purge_invocation instead.",
54-
operation_id = "delete_invocation",
55-
tags = "invocation",
56-
parameters(
57-
path(
58-
name = "invocation_id",
59-
description = "Invocation identifier.",
60-
schema = "std::string::String"
61-
),
62-
query(
63-
name = "mode",
64-
description = "If cancel, it will gracefully terminate the invocation. \
65-
If kill, it will terminate the invocation with a hard stop. \
66-
If purge, it will only cleanup the response for completed invocations, and leave unaffected an in-flight invocation.",
67-
required = false,
68-
style = "simple",
69-
allow_empty_value = false,
70-
schema = "DeletionMode",
71-
)
72-
),
73-
responses(
74-
ignore_return_type = true,
75-
response(
76-
status = "202",
77-
description = "Accepted",
78-
content = "okapi_operation::Empty",
79-
),
80-
from_type = "MetaApiError",
81-
)
82-
)]
83-
pub async fn delete_invocation<Metadata, Discovery, Telemetry, Invocations>(
84-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
85-
Path(invocation_id): Path<String>,
86-
Query(DeleteInvocationParams { mode }): Query<DeleteInvocationParams>,
87-
) -> Result<StatusCode, MetaApiError> {
88-
let invocation_id = invocation_id
89-
.parse::<InvocationId>()
90-
.map_err(|e| MetaApiError::InvalidField("invocation_id", e.to_string()))?;
91-
92-
let cmd = match mode.unwrap_or_default() {
93-
DeletionMode::Cancel => Command::TerminateInvocation(InvocationTermination {
94-
invocation_id,
95-
flavor: TerminationFlavor::Cancel,
96-
response_sink: None,
97-
}),
98-
DeletionMode::Kill => Command::TerminateInvocation(InvocationTermination {
99-
invocation_id,
100-
flavor: TerminationFlavor::Kill,
101-
response_sink: None,
102-
}),
103-
DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest {
104-
invocation_id,
105-
response_sink: None,
106-
}),
107-
};
108-
109-
let partition_key = invocation_id.partition_key();
110-
111-
let result = restate_bifrost::append_to_bifrost(
112-
&state.bifrost,
113-
Arc::new(Envelope::new(create_envelope_header(partition_key), cmd)),
114-
)
115-
.await;
116-
117-
if let Err(err) = result {
118-
warn!("Could not append invocation termination command to Bifrost: {err}");
119-
Err(MetaApiError::Internal(
120-
"Failed sending invocation termination to the cluster.".to_owned(),
121-
))
122-
} else {
123-
Ok(StatusCode::ACCEPTED)
124-
}
125-
}
25+
use super::error::*;
26+
use crate::generate_meta_api_error;
27+
use crate::state::AdminServiceState;
12628

12729
generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]);
12830

@@ -139,8 +41,8 @@ generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, Invocati
13941
schema = "std::string::String"
14042
))
14143
)]
142-
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations>(
143-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
44+
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
45+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
14446
Path(invocation_id): Path<String>,
14547
) -> Result<(), KillInvocationError>
14648
where
@@ -199,8 +101,8 @@ generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, Invoca
199101
from_type = "CancelInvocationError",
200102
)
201103
)]
202-
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations>(
203-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
104+
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
105+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
204106
Path(invocation_id): Path<String>,
205107
) -> Result<StatusCode, CancelInvocationError>
206108
where
@@ -241,8 +143,8 @@ generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, Invocat
241143
schema = "std::string::String"
242144
))
243145
)]
244-
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations>(
245-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
146+
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
147+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
246148
Path(invocation_id): Path<String>,
247149
) -> Result<(), PurgeInvocationError>
248150
where
@@ -284,8 +186,8 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation
284186
schema = "std::string::String"
285187
))
286188
)]
287-
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations>(
288-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
189+
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations, Transport>(
190+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
289191
Path(invocation_id): Path<String>,
290192
) -> Result<(), PurgeJournalError>
291193
where
@@ -398,8 +300,8 @@ generate_meta_api_error!(RestartInvocationError: [
398300
),
399301
)
400302
)]
401-
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations>(
402-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
303+
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
304+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
403305
Path(invocation_id): Path<String>,
404306
Query(RestartAsNewInvocationQueryParams { from, deployment }): Query<
405307
RestartAsNewInvocationQueryParams,
@@ -510,8 +412,8 @@ generate_meta_api_error!(ResumeInvocationError: [
510412
)
511413
)
512414
)]
513-
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations>(
514-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
415+
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
416+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
515417
Path(invocation_id): Path<String>,
516418
Query(ResumeInvocationQueryParams { deployment }): Query<ResumeInvocationQueryParams>,
517419
) -> Result<(), ResumeInvocationError>
@@ -596,8 +498,8 @@ generate_meta_api_error!(PauseInvocationError: [
596498
from_type = "PauseInvocationError",
597499
)
598500
)]
599-
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations>(
600-
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
501+
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
502+
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
601503
Path(invocation_id): Path<String>,
602504
) -> Result<StatusCode, PauseInvocationError>
603505
where

crates/admin/src/rest_api/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod version;
2323
use okapi_operation::axum_integration::{delete, get, patch, post};
2424
use okapi_operation::okapi::openapi3::{ExternalDocs, Tag};
2525
use okapi_operation::*;
26+
use restate_core::network::TransportConnect;
2627
use restate_types::identifiers::PartitionKey;
2728
use restate_types::invocation::client::InvocationClient;
2829
use restate_types::schema::registry::{DiscoveryClient, MetadataService, TelemetryClient};
@@ -32,14 +33,15 @@ use crate::state::AdminServiceState;
3233

3334
pub use version::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION};
3435

35-
pub fn create_router<Metadata, Discovery, Telemetry, Invocations>(
36-
state: AdminServiceState<Metadata, Discovery, Telemetry, Invocations>,
36+
pub fn create_router<Metadata, Discovery, Telemetry, Invocations, Transport>(
37+
state: AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>,
3738
) -> axum::Router<()>
3839
where
3940
Metadata: MetadataService + Send + Sync + Clone + 'static,
4041
Discovery: DiscoveryClient + Send + Sync + Clone + 'static,
4142
Telemetry: TelemetryClient + Send + Sync + Clone + 'static,
4243
Invocations: InvocationClient + Send + Sync + Clone + 'static,
44+
Transport: TransportConnect,
4345
{
4446
let mut router = axum_integration::Router::new()
4547
.route(
@@ -91,10 +93,6 @@ where
9193
"/services/{service}/handlers/{handler}",
9294
get(openapi_handler!(handlers::get_service_handler)),
9395
)
94-
.route(
95-
"/invocations/{invocation_id}",
96-
delete(openapi_handler!(invocations::delete_invocation)),
97-
)
9896
.route(
9997
"/invocations/{invocation_id}/kill",
10098
patch(openapi_handler!(invocations::kill_invocation)),

0 commit comments

Comments
 (0)