Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flow refill time window after reboot #5238

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c78bd8c
fix: flow schema cache
discord9 Dec 27, 2024
2482738
refactor: location for `to_meta_err`
discord9 Dec 27, 2024
cfd0d16
chore: endfile emptyline
discord9 Dec 27, 2024
5a31273
chore: review(partially)
discord9 Dec 30, 2024
5a1c91b
chore: per review
discord9 Dec 30, 2024
bf38c5f
refactor: per review
discord9 Jan 2, 2025
185fa5e
refactor: per review
discord9 Jan 2, 2025
f17051b
feat: find time window lower bound
discord9 Dec 16, 2024
d426ca9
chore: typo
discord9 Dec 16, 2024
3e37eb7
chore: clippy
discord9 Dec 16, 2024
42aa2dc
feat: find in plan
discord9 Dec 16, 2024
ecafe7d
chore: clippy
discord9 Dec 16, 2024
7582957
test: auto get plan time window lower bound
discord9 Dec 17, 2024
585d748
WIP: refill flow
discord9 Dec 17, 2024
56fce68
TODO: impl refill tasks
discord9 Dec 18, 2024
9c2de1a
WIP: RefillTask
discord9 Dec 19, 2024
0b32ae7
feat: create refill task
discord9 Dec 19, 2024
2a27af2
refactor: QueryStream
discord9 Dec 20, 2024
c1bb044
chore: typo
discord9 Dec 20, 2024
6ba263a
feat: handle refill inserts
discord9 Dec 20, 2024
112ccff
feat: refill task
discord9 Dec 23, 2024
85cbd1f
feat: refill impl
discord9 Dec 24, 2024
3b7451e
dbg: more debug logs
discord9 Dec 24, 2024
5eb7682
fix: hang due to mis calc row cnt
discord9 Dec 24, 2024
d9edd83
tests: sqlness
discord9 Dec 25, 2024
9354a19
fix: handle src table missing
discord9 Dec 26, 2024
addf166
chore: after rebase
discord9 Dec 26, 2024
dc51f49
refactor: more resilent flow recover
discord9 Dec 26, 2024
06ef06f
todo: adjust flow's time index
discord9 Dec 26, 2024
a00a526
chore: comment
discord9 Dec 30, 2024
27fab33
chore: after rebase
discord9 Dec 30, 2024
a98f85d
feat: fix time index for flow plan
discord9 Dec 30, 2024
66faed0
test: correct sink time index
discord9 Dec 31, 2024
6eb8a69
TODO: fix check flow plan with auto created
discord9 Jan 2, 2025
50cdc8a
chore: after rebase
discord9 Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
Expand All @@ -38,8 +39,8 @@ use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
BuildCacheRegistrySnafu, BuildCliSnafu, InitMetadataSnafu, LoadLayeredConfigSnafu,
MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -301,7 +302,7 @@ impl StartCommand {
Plugins::new(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
flow_metadata_manager.clone(),
)
.with_heartbeat_task(heartbeat_task);

Expand All @@ -316,7 +317,7 @@ impl StartCommand {
let client = Arc::new(NodeClients::new(channel_config));

let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(),
None,
catalog_manager.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
Expand All @@ -330,6 +331,16 @@ impl StartCommand {
.set_frontend_invoker(invoker)
.await;

if let Err(err) = flownode
.flow_worker_manager()
.create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _))
.await
.map_err(BoxedError::new)
.context(BuildCliSnafu)
{
common_telemetry::error!(?err, "Failed to create and start refill flow tasks");
}

Ok(Instance::new(flownode, guard))
}
}
21 changes: 15 additions & 6 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
BuildCacheRegistrySnafu, BuildCliSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
Expand Down Expand Up @@ -580,7 +580,7 @@ impl StartCommand {
layered_cache_registry.clone(),
table_metadata_manager,
table_meta_allocator,
flow_metadata_manager,
flow_metadata_manager.clone(),
flow_meta_allocator,
)
.await?;
Expand All @@ -602,7 +602,7 @@ impl StartCommand {
let flow_worker_manager = flownode.flow_worker_manager();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(),
Some(frontend.query_engine()),
catalog_manager.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Expand All @@ -613,6 +613,15 @@ impl StartCommand {
.context(StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;

if let Err(err) = flow_worker_manager
.create_and_start_refill_flow_tasks(&flow_metadata_manager, &(catalog_manager as _))
.await
.map_err(BoxedError::new)
.context(BuildCliSnafu)
{
common_telemetry::error!(err; "failed to refill flow");
}

let (tx, _rx) = broadcast::channel(1);

let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
Expand Down
Loading
Loading