Skip to content

Commit 05e8afc

Browse files
Clean up the SubscriptionValidator abstraction (#3866)
This is not a useful abstraction these days, it was introduced long ago when we didn't have Configuration::pinned. (#3866)
1 parent fafc03f commit 05e8afc

File tree

15 files changed

+134
-192
lines changed

15 files changed

+134
-192
lines changed

crates/admin/src/rest_api/deployments.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ use serde::Deserialize;
5656
from_type = "MetaApiError",
5757
)
5858
)]
59-
pub async fn create_deployment<V, IC>(
60-
State(state): State<AdminServiceState<V, IC>>,
59+
pub async fn create_deployment<IC>(
60+
State(state): State<AdminServiceState<IC>>,
6161
Extension(version): Extension<AdminApiVersion>,
6262
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
6363
) -> Result<impl IntoResponse, MetaApiError> {
@@ -181,8 +181,8 @@ pub async fn create_deployment<V, IC>(
181181
schema = "std::string::String"
182182
))
183183
)]
184-
pub async fn get_deployment<V, IC>(
185-
State(state): State<AdminServiceState<V, IC>>,
184+
pub async fn get_deployment<IC>(
185+
State(state): State<AdminServiceState<IC>>,
186186
Path(deployment_id): Path<DeploymentId>,
187187
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError> {
188188
let (deployment, services) = state
@@ -200,8 +200,8 @@ pub async fn get_deployment<V, IC>(
200200
operation_id = "list_deployments",
201201
tags = "deployment"
202202
)]
203-
pub async fn list_deployments<V, IC>(
204-
State(state): State<AdminServiceState<V, IC>>,
203+
pub async fn list_deployments<IC>(
204+
State(state): State<AdminServiceState<IC>>,
205205
) -> Json<ListDeploymentsResponse> {
206206
let deployments = state
207207
.schema_registry
@@ -254,8 +254,8 @@ pub struct DeleteDeploymentParams {
254254
from_type = "MetaApiError",
255255
)
256256
)]
257-
pub async fn delete_deployment<V, IC>(
258-
State(state): State<AdminServiceState<V, IC>>,
257+
pub async fn delete_deployment<IC>(
258+
State(state): State<AdminServiceState<IC>>,
259259
Path(deployment_id): Path<DeploymentId>,
260260
Query(DeleteDeploymentParams { force }): Query<DeleteDeploymentParams>,
261261
) -> Result<StatusCode, MetaApiError> {
@@ -286,8 +286,8 @@ pub async fn delete_deployment<V, IC>(
286286
schema = "std::string::String"
287287
))
288288
)]
289-
pub async fn update_deployment<V, IC>(
290-
State(state): State<AdminServiceState<V, IC>>,
289+
pub async fn update_deployment<IC>(
290+
State(state): State<AdminServiceState<IC>>,
291291
Extension(version): Extension<AdminApiVersion>,
292292
method: Method,
293293
Path(deployment_id): Path<DeploymentId>,

crates/admin/src/rest_api/handlers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use restate_types::schema::service::HandlerMetadata;
2929
schema = "std::string::String"
3030
))
3131
)]
32-
pub async fn list_service_handlers<V, IC>(
33-
State(state): State<AdminServiceState<V, IC>>,
32+
pub async fn list_service_handlers<IC>(
33+
State(state): State<AdminServiceState<IC>>,
3434
Path(service_name): Path<String>,
3535
) -> Result<Json<ListServiceHandlersResponse>, MetaApiError> {
3636
match state.schema_registry.list_service_handlers(&service_name) {
@@ -58,8 +58,8 @@ pub async fn list_service_handlers<V, IC>(
5858
)
5959
)
6060
)]
61-
pub async fn get_service_handler<V, IC>(
62-
State(state): State<AdminServiceState<V, IC>>,
61+
pub async fn get_service_handler<IC>(
62+
State(state): State<AdminServiceState<IC>>,
6363
Path((service_name, handler_name)): Path<(String, String)>,
6464
) -> Result<Json<HandlerMetadata>, MetaApiError> {
6565
match state

crates/admin/src/rest_api/invocations.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ pub struct DeleteInvocationParams {
8080
from_type = "MetaApiError",
8181
)
8282
)]
83-
pub async fn delete_invocation<V, IC>(
84-
State(state): State<AdminServiceState<V, IC>>,
83+
pub async fn delete_invocation<IC>(
84+
State(state): State<AdminServiceState<IC>>,
8585
Path(invocation_id): Path<String>,
8686
Query(DeleteInvocationParams { mode }): Query<DeleteInvocationParams>,
8787
) -> Result<StatusCode, MetaApiError> {
@@ -139,8 +139,8 @@ generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, Invocati
139139
schema = "std::string::String"
140140
))
141141
)]
142-
pub async fn kill_invocation<V, IC>(
143-
State(state): State<AdminServiceState<V, IC>>,
142+
pub async fn kill_invocation<IC>(
143+
State(state): State<AdminServiceState<IC>>,
144144
Path(invocation_id): Path<String>,
145145
) -> Result<(), KillInvocationError>
146146
where
@@ -199,8 +199,8 @@ generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, Invoca
199199
from_type = "CancelInvocationError",
200200
)
201201
)]
202-
pub async fn cancel_invocation<V, IC>(
203-
State(state): State<AdminServiceState<V, IC>>,
202+
pub async fn cancel_invocation<IC>(
203+
State(state): State<AdminServiceState<IC>>,
204204
Path(invocation_id): Path<String>,
205205
) -> Result<StatusCode, CancelInvocationError>
206206
where
@@ -241,8 +241,8 @@ generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, Invocat
241241
schema = "std::string::String"
242242
))
243243
)]
244-
pub async fn purge_invocation<V, IC>(
245-
State(state): State<AdminServiceState<V, IC>>,
244+
pub async fn purge_invocation<IC>(
245+
State(state): State<AdminServiceState<IC>>,
246246
Path(invocation_id): Path<String>,
247247
) -> Result<(), PurgeInvocationError>
248248
where
@@ -284,8 +284,8 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation
284284
schema = "std::string::String"
285285
))
286286
)]
287-
pub async fn purge_journal<V, IC>(
288-
State(state): State<AdminServiceState<V, IC>>,
287+
pub async fn purge_journal<IC>(
288+
State(state): State<AdminServiceState<IC>>,
289289
Path(invocation_id): Path<String>,
290290
) -> Result<(), PurgeJournalError>
291291
where
@@ -398,8 +398,8 @@ generate_meta_api_error!(RestartInvocationError: [
398398
),
399399
)
400400
)]
401-
pub async fn restart_as_new_invocation<V, IC>(
402-
State(state): State<AdminServiceState<V, IC>>,
401+
pub async fn restart_as_new_invocation<IC>(
402+
State(state): State<AdminServiceState<IC>>,
403403
Path(invocation_id): Path<String>,
404404
Query(RestartAsNewInvocationQueryParams { from, deployment }): Query<
405405
RestartAsNewInvocationQueryParams,
@@ -510,8 +510,8 @@ generate_meta_api_error!(ResumeInvocationError: [
510510
)
511511
)
512512
)]
513-
pub async fn resume_invocation<V, IC>(
514-
State(state): State<AdminServiceState<V, IC>>,
513+
pub async fn resume_invocation<IC>(
514+
State(state): State<AdminServiceState<IC>>,
515515
Path(invocation_id): Path<String>,
516516
Query(ResumeInvocationQueryParams { deployment }): Query<ResumeInvocationQueryParams>,
517517
) -> Result<(), ResumeInvocationError>

crates/admin/src/rest_api/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@ use okapi_operation::okapi::openapi3::{ExternalDocs, Tag};
2525
use okapi_operation::*;
2626
use restate_types::identifiers::PartitionKey;
2727
use restate_types::invocation::client::InvocationClient;
28-
use restate_types::schema::subscriptions::SubscriptionValidator;
2928
use restate_wal_protocol::{Destination, Header, Source};
3029

3130
use crate::state::AdminServiceState;
3231

3332
pub use version::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION};
3433

35-
pub fn create_router<V, IC>(state: AdminServiceState<V, IC>) -> axum::Router<()>
34+
pub fn create_router<IC>(state: AdminServiceState<IC>) -> axum::Router<()>
3635
where
37-
V: SubscriptionValidator + Send + Sync + Clone + 'static,
3836
IC: InvocationClient + Send + Sync + Clone + 'static,
3937
{
4038
let mut router = axum_integration::Router::new()

crates/admin/src/rest_api/services.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use tracing::{debug, warn};
3535
operation_id = "list_services",
3636
tags = "service"
3737
)]
38-
pub async fn list_services<V, IC>(
39-
State(state): State<AdminServiceState<V, IC>>,
38+
pub async fn list_services<IC>(
39+
State(state): State<AdminServiceState<IC>>,
4040
) -> Result<Json<ListServicesResponse>, MetaApiError> {
4141
let services = state.schema_registry.list_services();
4242

@@ -55,8 +55,8 @@ pub async fn list_services<V, IC>(
5555
schema = "std::string::String"
5656
))
5757
)]
58-
pub async fn get_service<V, IC>(
59-
State(state): State<AdminServiceState<V, IC>>,
58+
pub async fn get_service<IC>(
59+
State(state): State<AdminServiceState<IC>>,
6060
Path(service_name): Path<String>,
6161
) -> Result<Json<ServiceMetadata>, MetaApiError> {
6262
state
@@ -87,8 +87,8 @@ pub async fn get_service<V, IC>(
8787
from_type = "MetaApiError",
8888
)
8989
)]
90-
pub async fn get_service_openapi<V, IC>(
91-
State(state): State<AdminServiceState<V, IC>>,
90+
pub async fn get_service_openapi<IC>(
91+
State(state): State<AdminServiceState<IC>>,
9292
Path(service_name): Path<String>,
9393
) -> Result<Json<serde_json::Value>, MetaApiError> {
9494
// TODO return correct vnd type
@@ -112,8 +112,8 @@ pub async fn get_service_openapi<V, IC>(
112112
schema = "std::string::String"
113113
))
114114
)]
115-
pub async fn modify_service<V, IC>(
116-
State(state): State<AdminServiceState<V, IC>>,
115+
pub async fn modify_service<IC>(
116+
State(state): State<AdminServiceState<IC>>,
117117
Path(service_name): Path<String>,
118118
#[request_body(required = true)] Json(ModifyServiceRequest {
119119
public,
@@ -183,8 +183,8 @@ pub async fn modify_service<V, IC>(
183183
from_type = "MetaApiError",
184184
)
185185
)]
186-
pub async fn modify_service_state<V, IC>(
187-
State(state): State<AdminServiceState<V, IC>>,
186+
pub async fn modify_service_state<IC>(
187+
State(state): State<AdminServiceState<IC>>,
188188
Path(service_name): Path<String>,
189189
#[request_body(required = true)] Json(ModifyServiceStateRequest {
190190
version,

crates/admin/src/rest_api/subscriptions.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::error::*;
1212
use crate::state::AdminServiceState;
1313

1414
use restate_admin_rest_model::subscriptions::*;
15-
use restate_types::schema::subscriptions::{ListSubscriptionFilter, SubscriptionValidator};
15+
use restate_types::schema::subscriptions::ListSubscriptionFilter;
1616

1717
use axum::extract::Query;
1818
use axum::extract::{Path, State};
@@ -41,8 +41,8 @@ use restate_types::identifiers::SubscriptionId;
4141
from_type = "MetaApiError",
4242
)
4343
)]
44-
pub async fn create_subscription<V: SubscriptionValidator, IC>(
45-
State(state): State<AdminServiceState<V, IC>>,
44+
pub async fn create_subscription<IC>(
45+
State(state): State<AdminServiceState<IC>>,
4646
#[request_body(required = true)] Json(payload): Json<CreateSubscriptionRequest>,
4747
) -> Result<impl axum::response::IntoResponse, MetaApiError> {
4848
let subscription = state
@@ -73,8 +73,8 @@ pub async fn create_subscription<V: SubscriptionValidator, IC>(
7373
schema = "std::string::String"
7474
))
7575
)]
76-
pub async fn get_subscription<V, IC>(
77-
State(state): State<AdminServiceState<V, IC>>,
76+
pub async fn get_subscription<IC>(
77+
State(state): State<AdminServiceState<IC>>,
7878
Path(subscription_id): Path<SubscriptionId>,
7979
) -> Result<Json<SubscriptionResponse>, MetaApiError> {
8080
let subscription = state
@@ -110,8 +110,8 @@ pub async fn get_subscription<V, IC>(
110110
)
111111
)
112112
)]
113-
pub async fn list_subscriptions<V, IC>(
114-
State(state): State<AdminServiceState<V, IC>>,
113+
pub async fn list_subscriptions<IC>(
114+
State(state): State<AdminServiceState<IC>>,
115115
Query(ListSubscriptionsParams { sink, source }): Query<ListSubscriptionsParams>,
116116
) -> Json<ListSubscriptionsResponse> {
117117
let filters = match (sink, source) {
@@ -158,8 +158,8 @@ pub async fn list_subscriptions<V, IC>(
158158
from_type = "MetaApiError",
159159
)
160160
)]
161-
pub async fn delete_subscription<V, IC>(
162-
State(state): State<AdminServiceState<V, IC>>,
161+
pub async fn delete_subscription<IC>(
162+
State(state): State<AdminServiceState<IC>>,
163163
Path(subscription_id): Path<SubscriptionId>,
164164
) -> Result<StatusCode, MetaApiError> {
165165
state

crates/admin/src/schema_registry/mod.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use restate_types::schema::deployment::{
3131
};
3232
use restate_types::schema::service::{HandlerMetadata, ServiceMetadata, ServiceMetadataResolver};
3333
use restate_types::schema::subscriptions::{
34-
ListSubscriptionFilter, Subscription, SubscriptionResolver, SubscriptionValidator,
34+
ListSubscriptionFilter, Subscription, SubscriptionResolver,
3535
};
3636
pub(crate) use restate_types::schema::updater::RegisterDeploymentResult;
3737
use restate_types::schema::{Schema, updater};
@@ -119,25 +119,22 @@ pub(crate) enum UpdateDeploymentAddress {
119119

120120
/// This is the business logic driving the Admin API schema related endpoints.
121121
#[derive(Clone)]
122-
pub struct SchemaRegistry<V> {
122+
pub struct SchemaRegistry {
123123
metadata_writer: MetadataWriter,
124124
service_discovery: ServiceDiscovery,
125125
telemetry_http_client: Option<HttpClient>,
126-
subscription_validator: V,
127126
}
128127

129-
impl<V> SchemaRegistry<V> {
128+
impl SchemaRegistry {
130129
pub fn new(
131130
metadata_writer: MetadataWriter,
132131
service_discovery: ServiceDiscovery,
133132
telemetry_http_client: Option<HttpClient>,
134-
subscription_validator: V,
135133
) -> Self {
136134
Self {
137135
metadata_writer,
138136
service_discovery,
139137
telemetry_http_client,
140-
subscription_validator,
141138
}
142139
}
143140

@@ -613,12 +610,7 @@ impl<V> SchemaRegistry<V> {
613610
pub fn list_subscriptions(&self, filters: &[ListSubscriptionFilter]) -> Vec<Subscription> {
614611
Metadata::with_current(|m| m.schema()).list_subscriptions(filters)
615612
}
616-
}
617613

618-
impl<V> SchemaRegistry<V>
619-
where
620-
V: SubscriptionValidator,
621-
{
622614
pub(crate) async fn create_subscription(
623615
&self,
624616
source: Uri,
@@ -641,7 +633,6 @@ where
641633
source.clone(),
642634
sink.clone(),
643635
options.clone(),
644-
&self.subscription_validator,
645636
)
646637
},
647638
)?;

crates/admin/src/service.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use restate_types::config::AdminOptions;
2424
use restate_types::invocation::client::InvocationClient;
2525
use restate_types::live::LiveLoad;
2626
use restate_types::net::BindAddress;
27-
use restate_types::schema::subscriptions::SubscriptionValidator;
2827
use std::time::Duration;
2928
use tower::ServiceBuilder;
3029
use tower_http::classify::ServerErrorsFailureClass;
@@ -35,26 +34,24 @@ use tracing::{Span, debug, error, info, info_span};
3534
#[error("could not create the service client: {0}")]
3635
pub struct BuildError(#[from] restate_service_client::BuildError);
3736

38-
pub struct AdminService<V, IC> {
37+
pub struct AdminService<IC> {
3938
bifrost: Bifrost,
40-
schema_registry: SchemaRegistry<V>,
39+
schema_registry: SchemaRegistry,
4140
invocation_client: IC,
4241
#[cfg(feature = "storage-query")]
4342
query_context: Option<restate_storage_query_datafusion::context::QueryContext>,
4443
#[cfg(feature = "metadata-api")]
4544
metadata_writer: MetadataWriter,
4645
}
4746

48-
impl<V, IC> AdminService<V, IC>
47+
impl<IC> AdminService<IC>
4948
where
50-
V: SubscriptionValidator + Send + Sync + Clone + 'static,
5149
IC: InvocationClient + Send + Sync + Clone + 'static,
5250
{
5351
pub fn new(
5452
metadata_writer: MetadataWriter,
5553
bifrost: Bifrost,
5654
invocation_client: IC,
57-
subscription_validator: V,
5855
service_discovery: ServiceDiscovery,
5956
telemetry_http_client: Option<HttpClient>,
6057
) -> Self {
@@ -66,7 +63,6 @@ where
6663
metadata_writer,
6764
service_discovery,
6865
telemetry_http_client,
69-
subscription_validator,
7066
),
7167
invocation_client,
7268
#[cfg(feature = "storage-query")]

0 commit comments

Comments
 (0)