Skip to content

Commit

Permalink
Move stream publishers to connection-level context (#2452)
Browse files Browse the repository at this point in the history
This is the first PR of implementation of dropping a stream's server-side counterpart when a stream on the client-side is dropped.  This is all part of the activities for managing offsets per consumer on the SPU side.

Currently, the lifetime of `StreamPublisher` is equal to the lifetime of Tcp connection between SPU and client. We do not clean up `StreamPublishers` until the connection is dropped. Therefore, there is no need to keep them inside `GlobalContext` and have it multi-threaded, because it is always touched by only one thread (the one that handles connection). 

This PR doesn't add or remove any functional behavior. The `StreamPublishers` will be dropped after disconnect as it does now. The actual drop mechanism will go in another PR.
  • Loading branch information
Alexander Galibey committed Jul 1, 2022
1 parent 8935fe7 commit 44e2f04
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release Notes

## Platform Version 0.9.31 - UNRELEASED
* Move stream publishers to connection-level context ([#2452](https://github.com/infinyon/fluvio/pull/2452))

## Platform Version 0.9.30 - 2022-06-29
* Improve CLI error output when log_dir isn't writable ([#2425](https://github.com/infinyon/fluvio/pull/2425))
Expand Down
7 changes: 0 additions & 7 deletions crates/fluvio-spu/src/core/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::replication::follower::SharedFollowersState;
use crate::replication::leader::{
SharedReplicaLeadersState, ReplicaLeadersState, FollowerNotifier, SharedSpuUpdates,
};
use crate::services::public::StreamPublishers;
use crate::control_plane::{StatusMessageSink, SharedStatusUpdate};
use fluvio_smartengine::SmartEngine;

Expand All @@ -44,7 +43,6 @@ pub struct GlobalContext<S> {
derivedstream_localstore: SharedStreamStreamLocalStore,
leaders_state: SharedReplicaLeadersState<S>,
followers_state: SharedFollowersState<S>,
stream_publishers: StreamPublishers,
spu_followers: SharedSpuUpdates,
status_update: SharedStatusUpdate,
sm_engine: SmartEngine,
Expand Down Expand Up @@ -75,7 +73,6 @@ where
config: Arc::new(spu_config),
leaders_state: ReplicaLeadersState::new_shared(),
followers_state: FollowersState::new_shared(),
stream_publishers: StreamPublishers::new(),
spu_followers: FollowerNotifier::shared(),
status_update: StatusMessageSink::shared(),
sm_engine: SmartEngine::default(),
Expand Down Expand Up @@ -128,10 +125,6 @@ where
self.config.clone()
}

pub fn stream_publishers(&self) -> &StreamPublishers {
&self.stream_publishers
}

pub fn follower_notifier(&self) -> &FollowerNotifier {
&self.spu_followers
}
Expand Down
22 changes: 22 additions & 0 deletions crates/fluvio-spu/src/services/public/conn_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::services::public::StreamPublishers;

#[derive(Debug)]
pub(crate) struct ConnectionContext {
stream_publishers: StreamPublishers,
}

impl ConnectionContext {
pub(crate) fn new() -> Self {
Self {
stream_publishers: StreamPublishers::new(),
}
}

pub(crate) fn stream_publishers(&self) -> &StreamPublishers {
&self.stream_publishers
}

pub(crate) fn stream_publishers_mut(&mut self) -> &mut StreamPublishers {
&mut self.stream_publishers
}
}
9 changes: 6 additions & 3 deletions crates/fluvio-spu/src/services/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod stream_fetch;

#[cfg(test)]
mod tests;
mod conn_context;

use std::sync::Arc;
use async_trait::async_trait;
Expand All @@ -25,8 +26,8 @@ use self::produce_handler::handle_produce_request;
use self::fetch_handler::handle_fetch_request;
use self::offset_request::handle_offset_request;
use self::offset_update::handle_offset_update;
use self::stream_fetch::StreamFetchHandler;
pub use stream_fetch::publishers::StreamPublishers;
use self::stream_fetch::{StreamFetchHandler, publishers::StreamPublishers};
use self::conn_context::ConnectionContext;

pub(crate) type SpuPublicServer =
FluvioApiServer<SpuServerRequest, SpuServerApiKey, DefaultSharedGlobalContext, PublicService>;
Expand Down Expand Up @@ -70,6 +71,7 @@ impl FluvioService for PublicService {
let api_stream = stream.api_stream::<SpuServerRequest, SpuServerApiKey>();
let shutdown = StickyEvent::shared();
let mut event_stream = api_stream.take_until(shutdown.listen_pinned());
let mut conn_ctx = ConnectionContext::new();

loop {
let event = event_stream.next().await;
Expand Down Expand Up @@ -108,14 +110,15 @@ impl FluvioService for PublicService {
StreamFetchHandler::start(
request,
context.clone(),
&mut conn_ctx,
shared_sink.clone(),
shutdown.clone(),
)
.await?;
}
SpuServerRequest::UpdateOffsetsRequest(request) => call_service!(
request,
handle_offset_update(&context, request),
handle_offset_update(request, &mut conn_ctx),
shared_sink,
"UpdateOffsetsRequest"
),
Expand Down
10 changes: 5 additions & 5 deletions crates/fluvio-spu/src/services/public/offset_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ use fluvio_spu_schema::server::update_offset::{
};
use dataplane::ErrorCode;
use dataplane::api::{ResponseMessage, RequestMessage};
use crate::core::DefaultSharedGlobalContext;
use crate::services::public::conn_context::ConnectionContext;

#[instrument(skip(ctx, request))]
pub async fn handle_offset_update(
ctx: &DefaultSharedGlobalContext,
#[instrument(skip(conn_ctx, request))]
pub(crate) async fn handle_offset_update(
request: RequestMessage<UpdateOffsetsRequest>,
conn_ctx: &mut ConnectionContext,
) -> Result<ResponseMessage<UpdateOffsetsResponse>, IoError> {
debug!("received stream updates");
let (header, updates) = request.get_header_request();
let publishers = ctx.stream_publishers();
let publishers = conn_ctx.stream_publishers();
let mut status_list = vec![];

for update in updates.offsets {
Expand Down
60 changes: 21 additions & 39 deletions crates/fluvio-spu/src/services/public/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use dataplane::smartmodule::SmartModuleRuntimeError;

use crate::core::DefaultSharedGlobalContext;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::public::conn_context::ConnectionContext;
use crate::services::public::stream_fetch::publishers::INIT_OFFSET;
use crate::smartengine::SmartModuleContext;

/// Fetch records as stream
pub struct StreamFetchHandler {
ctx: DefaultSharedGlobalContext,
replica: ReplicaKey,
isolation: Isolation,
max_bytes: u32,
Expand All @@ -49,18 +49,21 @@ pub struct StreamFetchHandler {

impl StreamFetchHandler {
/// handle fluvio continuous fetch request
pub async fn start(
pub(crate) async fn start(
request: RequestMessage<FileStreamFetchRequest>,
ctx: DefaultSharedGlobalContext,
conn_ctx: &mut ConnectionContext,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<(), SocketError> {
let (header, msg) = request.get_header_request();
let replica = ReplicaKey::new(msg.topic.clone(), msg.partition);

if let Some(leader_state) = ctx.leaders_state().get(&replica).await {
let (stream_id, offset_publisher) =
ctx.stream_publishers().create_new_publisher().await;
let (stream_id, offset_publisher) = conn_ctx
.stream_publishers_mut()
.create_new_publisher()
.await;
let consumer_offset_listener = offset_publisher.change_listener();

spawn(async move {
Expand Down Expand Up @@ -166,7 +169,6 @@ impl StreamFetchHandler {
"stream fetch");

let handler = Self {
ctx: ctx.clone(),
isolation,
replica: replica.clone(),
max_bytes,
Expand Down Expand Up @@ -366,10 +368,6 @@ impl StreamFetchHandler {
}

debug!("done with stream fetch loop exiting");
self.ctx
.stream_publishers()
.remove_publisher(self.stream_id)
.await;

Ok(())
}
Expand Down Expand Up @@ -627,65 +625,49 @@ impl From<CompressionError> for StreamFetchError {
}
pub mod publishers {

use std::{
collections::HashMap,
sync::{Arc, atomic::AtomicU32},
};
use std::sync::atomic::Ordering::SeqCst;
use std::{collections::HashMap, sync::Arc};
use std::fmt::Debug;

use async_lock::Mutex;
use tracing::debug;
use std::ops::AddAssign;

use super::{OffsetPublisher};

pub const INIT_OFFSET: i64 = -1;

pub struct StreamPublishers {
publishers: Mutex<HashMap<u32, Arc<OffsetPublisher>>>,
stream_id: AtomicU32,
publishers: HashMap<u32, Arc<OffsetPublisher>>,
stream_id_seq: u32,
}

impl Debug for StreamPublishers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "stream {}", self.stream_id.load(SeqCst))
write!(f, "stream {}", self.stream_id_seq)
}
}

impl StreamPublishers {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
publishers: Mutex::new(HashMap::new()),
stream_id: AtomicU32::new(0),
publishers: HashMap::new(),
stream_id_seq: 0,
}
}

// get next stream id
fn next_stream_id(&self) -> u32 {
self.stream_id.fetch_add(1, SeqCst)
fn next_stream_id(&mut self) -> u32 {
self.stream_id_seq.add_assign(1);
self.stream_id_seq
}

pub async fn create_new_publisher(&self) -> (u32, Arc<OffsetPublisher>) {
pub async fn create_new_publisher(&mut self) -> (u32, Arc<OffsetPublisher>) {
let stream_id = self.next_stream_id();
let offset_publisher = OffsetPublisher::shared(INIT_OFFSET);
let mut publisher_lock = self.publishers.lock().await;
publisher_lock.insert(stream_id, offset_publisher.clone());
self.publishers.insert(stream_id, offset_publisher.clone());
(stream_id, offset_publisher)
}

/// get publisher with stream id
pub async fn get_publisher(&self, stream_id: u32) -> Option<Arc<OffsetPublisher>> {
let publisher_lock = self.publishers.lock().await;
publisher_lock.get(&stream_id).cloned()
}

pub async fn remove_publisher(&self, stream_id: u32) {
let mut publisher_lock = self.publishers.lock().await;
if publisher_lock.remove(&stream_id).is_some() {
debug!(stream_id, "removed stream publisher");
} else {
debug!(stream_id, "no stream publisher founded");
}
self.publishers.get(&stream_id).cloned()
}
}
}

0 comments on commit 44e2f04

Please sign in to comment.