Skip to content

Commit 8b8db7f

Browse files
committed
[Ingress] Handle IngestRequest message
Summary: Handle the incoming `IngestRequest` messages sent by the `ingress-core`
1 parent bcf7715 commit 8b8db7f

File tree

4 files changed

+190
-31
lines changed

4 files changed

+190
-31
lines changed

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use futures::future::OptionFuture;
2121
use futures::stream::FuturesUnordered;
2222
use futures::{FutureExt, StreamExt, stream};
2323
use metrics::counter;
24-
use restate_types::logs::Keys;
25-
use restate_wal_protocol::control::UpsertSchema;
2624
use tokio_stream::wrappers::{ReceiverStream, WatchStream};
2725
use tracing::{debug, trace};
2826

@@ -35,12 +33,15 @@ use restate_types::identifiers::{
3533
WithPartitionKey,
3634
};
3735
use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification};
36+
use restate_types::logs::Keys;
37+
use restate_types::net::ingress::IngestRecord;
3838
use restate_types::net::partition_processor::{
3939
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
4040
};
4141
use restate_types::time::MillisSinceEpoch;
4242
use restate_types::{SemanticRestateVersion, Version, Versioned};
4343
use restate_wal_protocol::Command;
44+
use restate_wal_protocol::control::UpsertSchema;
4445
use restate_wal_protocol::timer::TimerKeyValue;
4546

4647
use crate::metric_definitions::{PARTITION_HANDLE_LEADER_ACTIONS, USAGE_LEADER_ACTION_COUNT};
@@ -380,14 +381,35 @@ impl LeaderState {
380381
Ok(commit_token) => {
381382
self.awaiting_rpc_self_propose.push(SelfAppendFuture::new(
382383
commit_token,
383-
success_response,
384-
reciprocal,
384+
|result: Result<(), PartitionProcessorRpcError>| {
385+
reciprocal.send(result.map(|_| success_response));
386+
},
385387
));
386388
}
387389
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
388390
}
389391
}
390392

393+
pub async fn propose_many_with_callback<F>(
394+
&mut self,
395+
records: impl ExactSizeIterator<Item = IngestRecord>,
396+
callback: F,
397+
) where
398+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
399+
{
400+
match self
401+
.self_proposer
402+
.propose_many_with_notification(records)
403+
.await
404+
{
405+
Ok(commit_token) => {
406+
self.awaiting_rpc_self_propose
407+
.push(SelfAppendFuture::new(commit_token, callback));
408+
}
409+
Err(e) => callback(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
410+
}
411+
}
412+
391413
pub fn handle_actions(
392414
&mut self,
393415
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
@@ -594,42 +616,72 @@ impl LeaderState {
594616
}
595617
}
596618

619+
trait CallbackInner: Send + Sync + 'static {
620+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>);
621+
}
622+
623+
impl<F> CallbackInner for F
624+
where
625+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
626+
{
627+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>) {
628+
self(result)
629+
}
630+
}
631+
632+
struct Callback {
633+
inner: Box<dyn CallbackInner>,
634+
}
635+
636+
impl Callback {
637+
fn call(self, result: Result<(), PartitionProcessorRpcError>) {
638+
self.inner.call(result);
639+
}
640+
}
641+
642+
impl<I> From<I> for Callback
643+
where
644+
I: CallbackInner,
645+
{
646+
fn from(value: I) -> Self {
647+
Self {
648+
inner: Box::new(value),
649+
}
650+
}
651+
}
652+
597653
struct SelfAppendFuture {
598654
commit_token: CommitToken,
599-
response: Option<(PartitionProcessorRpcResponse, RpcReciprocal)>,
655+
callback: Option<Callback>,
600656
}
601657

602658
impl SelfAppendFuture {
603-
fn new(
604-
commit_token: CommitToken,
605-
success_response: PartitionProcessorRpcResponse,
606-
response_reciprocal: RpcReciprocal,
607-
) -> Self {
659+
fn new(commit_token: CommitToken, callback: impl Into<Callback>) -> Self {
608660
Self {
609661
commit_token,
610-
response: Some((success_response, response_reciprocal)),
662+
callback: Some(callback.into()),
611663
}
612664
}
613665

614666
fn fail_with_internal(&mut self) {
615-
if let Some((_, reciprocal)) = self.response.take() {
616-
reciprocal.send(Err(PartitionProcessorRpcError::Internal(
667+
if let Some(callback) = self.callback.take() {
668+
callback.call(Err(PartitionProcessorRpcError::Internal(
617669
"error when proposing to bifrost".to_string(),
618670
)));
619671
}
620672
}
621673

622674
fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) {
623-
if let Some((_, reciprocal)) = self.response.take() {
624-
reciprocal.send(Err(PartitionProcessorRpcError::LostLeadership(
675+
if let Some(callback) = self.callback.take() {
676+
callback.call(Err(PartitionProcessorRpcError::LostLeadership(
625677
this_partition_id,
626678
)));
627679
}
628680
}
629681

630682
fn succeed_with_appended(&mut self) {
631-
if let Some((success_response, reciprocal)) = self.response.take() {
632-
reciprocal.send(Ok(success_response));
683+
if let Some(callback) = self.callback.take() {
684+
callback.call(Ok(()))
633685
}
634686
}
635687
}

crates/worker/src/partition/leadership/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ use restate_types::errors::GenericError;
4646
use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId};
4747
use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch};
4848
use restate_types::message::MessageIndex;
49+
use restate_types::net::ingress::IngestRecord;
4950
use restate_types::net::partition_processor::{
5051
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
5152
};
5253
use restate_types::partitions::Partition;
5354
use restate_types::partitions::state::PartitionReplicaSetStates;
5455
use restate_types::retries::with_jitter;
5556
use restate_types::schema::Schema;
56-
use restate_types::storage::StorageEncodeError;
57+
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
5758
use restate_wal_protocol::Command;
5859
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
5960
use restate_wal_protocol::timer::TimerKeyValue;
@@ -82,7 +83,9 @@ pub(crate) enum Error {
8283
#[error("failed writing to bifrost: {0}")]
8384
Bifrost(#[from] restate_bifrost::Error),
8485
#[error("failed serializing payload: {0}")]
85-
Codec(#[from] StorageEncodeError),
86+
Encode(#[from] StorageEncodeError),
87+
#[error("failed deserializing payload: {0}")]
88+
Decode(#[from] StorageDecodeError),
8689
#[error(transparent)]
8790
Shutdown(#[from] ShutdownError),
8891
#[error("error when self proposing")]
@@ -612,6 +615,26 @@ impl<I> LeadershipState<I> {
612615
}
613616
}
614617
}
618+
619+
/// propose to this partition
620+
pub async fn propose_many_with_callback<F>(
621+
&mut self,
622+
records: impl ExactSizeIterator<Item = IngestRecord>,
623+
callback: F,
624+
) where
625+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
626+
{
627+
match &mut self.state {
628+
State::Follower | State::Candidate { .. } => callback(Err(
629+
PartitionProcessorRpcError::NotLeader(self.partition.partition_id),
630+
)),
631+
State::Leader(leader_state) => {
632+
leader_state
633+
.propose_many_with_callback(records, callback)
634+
.await;
635+
}
636+
}
637+
}
615638
}
616639
#[derive(Debug, derive_more::From)]
617640
struct TimerReader(PartitionStore);

crates/worker/src/partition/leadership/self_proposer.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use futures::never::Never;
1414

1515
use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy};
1616
use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber};
17-
use restate_types::{identifiers::PartitionKey, logs::LogId};
17+
use restate_types::{
18+
identifiers::PartitionKey, logs::LogId, net::ingress::IngestRecord, storage::StorageCodec,
19+
};
1820
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
1921

2022
use crate::partition::leadership::Error;
@@ -100,6 +102,44 @@ impl SelfProposer {
100102
Ok(commit_token)
101103
}
102104

105+
pub async fn propose_many_with_notification(
106+
&mut self,
107+
records: impl ExactSizeIterator<Item = IngestRecord>,
108+
) -> Result<CommitToken, Error> where {
109+
let sender = self.bifrost_appender.sender();
110+
111+
// This is ideally should be implemented
112+
// by using `sender.enqueue_many`
113+
// but since we have no guarantee over the
114+
// underlying channel size. a `reserve_many()` might
115+
// block forever waiting for n permits that will
116+
// never be available.
117+
//
118+
// sender
119+
// .enqueue_many(records)
120+
// .await
121+
// .map_err(|_| Error::SelfProposer)?;
122+
//
123+
// so instead we do this.
124+
125+
for mut record in records {
126+
// todo: unfortunately we need to decode tha pyaload first although
127+
// the appended will need to encode it eventually. Maybe if there
128+
// is a way to pass the raw encoded data directly to the appender
129+
let envelope = StorageCodec::decode(&mut record.record)?;
130+
131+
sender
132+
.enqueue(Arc::new(envelope))
133+
.await
134+
.map_err(|_| Error::SelfProposer)?;
135+
}
136+
137+
sender
138+
.notify_committed()
139+
.await
140+
.map_err(|_| Error::SelfProposer)
141+
}
142+
103143
fn create_header(&mut self, partition_key: PartitionKey) -> Header {
104144
let esn = self.epoch_sequence_number;
105145
self.epoch_sequence_number = self.epoch_sequence_number.next();

crates/worker/src/partition/mod.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tracing::{Span, debug, error, info, instrument, trace, warn};
3131

3232
use restate_bifrost::loglet::FindTailOptions;
3333
use restate_bifrost::{Bifrost, LogEntry, MaybeRecord};
34-
use restate_core::network::{Oneshot, Reciprocal, ServiceMessage, Verdict};
34+
use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict};
3535
use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id};
3636
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
3737
use restate_storage_api::deduplication_table::{
@@ -47,6 +47,7 @@ use restate_types::config::Configuration;
4747
use restate_types::identifiers::LeaderEpoch;
4848
use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber};
4949
use restate_types::net::RpcRequest;
50+
use restate_types::net::ingress::{IngestResponse, ReceivedIngestRequest};
5051
use restate_types::net::partition_processor::{
5152
PartitionLeaderService, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
5253
PartitionProcessorRpcResponse,
@@ -463,15 +464,7 @@ where
463464
self.status.effective_mode = self.leadership_state.effective_mode();
464465
}
465466
Some(msg) = self.network_leader_svc_rx.recv() => {
466-
match msg {
467-
ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => {
468-
let msg = msg.into_typed::<PartitionProcessorRpcRequest>();
469-
// note: split() decodes the payload
470-
let (response_tx, body) = msg.split();
471-
self.on_rpc(response_tx, body, &mut partition_store, live_schemas.live_load()).await;
472-
}
473-
msg => { msg.fail(Verdict::MessageUnrecognized); }
474-
}
467+
self.on_rpc(msg, &mut partition_store, live_schemas.live_load()).await;
475468
}
476469
_ = status_update_timer.tick() => {
477470
if durable_lsn_watch.has_changed().map_err(|e| ProcessorError::Other(e.into()))? {
@@ -599,7 +592,7 @@ where
599592
Ok(())
600593
}
601594

602-
async fn on_rpc(
595+
async fn on_pp_rpc_request(
603596
&mut self,
604597
response_tx: Reciprocal<
605598
Oneshot<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
@@ -615,6 +608,57 @@ where
615608
)
616609
.await;
617610
}
611+
612+
async fn on_rpc(
613+
&mut self,
614+
msg: ServiceMessage<PartitionLeaderService>,
615+
partition_store: &mut PartitionStore,
616+
schemas: &Schema,
617+
) {
618+
match msg {
619+
ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => {
620+
let msg = msg.into_typed::<PartitionProcessorRpcRequest>();
621+
// note: split() decodes the payload
622+
let (response_tx, body) = msg.split();
623+
self.on_pp_rpc_request(response_tx, body, partition_store, schemas)
624+
.await;
625+
}
626+
ServiceMessage::Rpc(msg) if msg.msg_type() == ReceivedIngestRequest::TYPE => {
627+
self.on_pp_ingest_request(msg.into_typed()).await;
628+
}
629+
msg => {
630+
msg.fail(Verdict::MessageUnrecognized);
631+
}
632+
}
633+
}
634+
635+
async fn on_pp_ingest_request(&mut self, msg: Incoming<Rpc<ReceivedIngestRequest>>) {
636+
let (reciprocal, request) = msg.split();
637+
self.leadership_state
638+
.propose_many_with_callback(
639+
request.records.into_iter(),
640+
|result: Result<(), PartitionProcessorRpcError>| match result {
641+
Ok(_) => reciprocal.send(IngestResponse::Ack),
642+
Err(err) => match err {
643+
PartitionProcessorRpcError::NotLeader(id)
644+
| PartitionProcessorRpcError::LostLeadership(id) => {
645+
reciprocal.send(IngestResponse::NotLeader { of: id })
646+
}
647+
PartitionProcessorRpcError::Starting => {
648+
reciprocal.send(IngestResponse::Starting)
649+
}
650+
PartitionProcessorRpcError::Stopping => {
651+
reciprocal.send(IngestResponse::Stopping)
652+
}
653+
PartitionProcessorRpcError::Internal(msg) => {
654+
reciprocal.send(IngestResponse::Internal { msg })
655+
}
656+
},
657+
},
658+
)
659+
.await;
660+
}
661+
618662
async fn maybe_advance<'a>(
619663
&mut self,
620664
maybe_record: LogEntry,

0 commit comments

Comments
 (0)