Skip to content

Commit db5dcc8

Browse files
committed
refactor: for easier mock
Signed-off-by: discord9 <[email protected]>
1 parent f35e3b7 commit db5dcc8

File tree

6 files changed

+155
-90
lines changed

6 files changed

+155
-90
lines changed

src/meta-srv/src/gc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod candidate;
16+
mod ctx;
1617
mod handler;
1718
mod mailbox;
1819
mod options;

src/meta-srv/src/gc/ctx.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::time::Duration;
17+
18+
use api::v1::meta::MailboxMessage;
19+
use common_meta::datanode::RegionStat;
20+
use common_meta::instruction::{Instruction, InstructionReply};
21+
use common_meta::key::TableMetadataManagerRef;
22+
use common_meta::key::table_route::PhysicalTableRouteValue;
23+
use common_meta::peer::Peer;
24+
use common_telemetry::error;
25+
use snafu::{OptionExt, ResultExt as _};
26+
use table::metadata::TableId;
27+
28+
use crate::cluster::MetaPeerClientRef;
29+
use crate::error;
30+
use crate::error::{Result, TableMetadataManagerSnafu};
31+
use crate::handler::HeartbeatMailbox;
32+
use crate::service::mailbox::{Channel, MailboxRef};
33+
34+
#[async_trait::async_trait]
35+
pub(crate) trait SchedulerCtx: Send + Sync {
36+
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
37+
38+
async fn get_table_route(
39+
&self,
40+
table_id: TableId,
41+
) -> Result<(TableId, PhysicalTableRouteValue)>;
42+
43+
async fn send_instruction(
44+
&self,
45+
peer: &Peer,
46+
instruction: Instruction,
47+
description: &str,
48+
timeout: Duration,
49+
) -> Result<InstructionReply>;
50+
}
51+
52+
pub(crate) struct DefaultGcSchedulerCtx {
53+
/// The metadata manager.
54+
pub(crate) table_metadata_manager: TableMetadataManagerRef,
55+
/// For getting `RegionStats`.
56+
pub(crate) meta_peer_client: MetaPeerClientRef,
57+
/// The mailbox to send messages.
58+
pub(crate) mailbox: MailboxRef,
59+
/// The server address.
60+
pub(crate) server_addr: String,
61+
}
62+
63+
#[async_trait::async_trait]
64+
impl SchedulerCtx for DefaultGcSchedulerCtx {
65+
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
66+
let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
67+
let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
68+
for (_dn_id, stats) in dn_stats {
69+
let mut stats = stats.stats;
70+
stats.sort_by_key(|s| s.timestamp_millis);
71+
72+
let Some(latest_stat) = stats.last().cloned() else {
73+
continue;
74+
};
75+
76+
for region_stat in latest_stat.region_stats {
77+
table_to_region_stats
78+
.entry(region_stat.id.table_id())
79+
.or_default()
80+
.push(region_stat);
81+
}
82+
}
83+
Ok(table_to_region_stats)
84+
}
85+
86+
async fn get_table_route(
87+
&self,
88+
table_id: TableId,
89+
) -> Result<(TableId, PhysicalTableRouteValue)> {
90+
self.table_metadata_manager
91+
.table_route_manager()
92+
.get_physical_table_route(table_id)
93+
.await
94+
.context(TableMetadataManagerSnafu)
95+
}
96+
97+
async fn send_instruction(
98+
&self,
99+
peer: &Peer,
100+
instruction: Instruction,
101+
description: &str,
102+
timeout: Duration,
103+
) -> Result<InstructionReply> {
104+
let msg = MailboxMessage::json_message(
105+
&format!("{}: {}", description, instruction),
106+
&format!("Metasrv@{}", self.server_addr),
107+
&format!("Datanode-{}@{}", peer.id, peer.addr),
108+
common_time::util::current_time_millis(),
109+
&instruction,
110+
)
111+
.with_context(|_| error::SerializeToJsonSnafu {
112+
input: instruction.to_string(),
113+
})?;
114+
115+
let mailbox_rx = self
116+
.mailbox
117+
.send(&Channel::Datanode(peer.id), msg, timeout)
118+
.await?;
119+
120+
match mailbox_rx.await {
121+
Ok(reply_msg) => {
122+
let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
123+
Ok(reply)
124+
}
125+
Err(e) => {
126+
error!(
127+
"Failed to receive reply from datanode {} for {}: {}",
128+
peer, description, e
129+
);
130+
Err(e)
131+
}
132+
}
133+
}
134+
}

src/meta-srv/src/gc/handler.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use common_meta::peer::Peer;
2121
use common_telemetry::{error, info, warn};
2222
use futures::StreamExt;
2323
use itertools::Itertools;
24-
use snafu::{OptionExt as _, ResultExt};
24+
use snafu::OptionExt as _;
2525
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
2626
use table::metadata::TableId;
2727
use tokio::time::sleep;
2828

29-
use crate::error::{self, RegionRouteNotFoundSnafu, Result, TableMetadataManagerSnafu};
29+
use crate::error::{self, RegionRouteNotFoundSnafu, Result};
3030
use crate::gc::GcScheduler;
3131
use crate::gc::candidate::GcCandidate;
3232
use crate::gc::tracker::RegionGcInfo;
@@ -39,7 +39,7 @@ impl GcScheduler {
3939
info!("Starting GC cycle");
4040

4141
// Step 1: Get all region statistics
42-
let table_to_region_stats = self.get_table_to_region_stats().await?;
42+
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
4343
info!(
4444
"Fetched region stats for {} tables",
4545
table_to_region_stats.len()
@@ -79,12 +79,12 @@ impl GcScheduler {
7979
);
8080

8181
// Step 1: Get table route information
82-
let (_, table_peer) = self
83-
.table_metadata_manager
84-
.table_route_manager()
85-
.get_physical_table_route(table_id)
86-
.await
87-
.context(TableMetadataManagerSnafu)?;
82+
// if is logic table, can simply pass.
83+
let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
84+
85+
if phy_table_id != table_id {
86+
return Ok(0);
87+
}
8888

8989
let region_to_peer = table_peer
9090
.region_routes

src/meta-srv/src/gc/mailbox.rs

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,62 +13,18 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16-
use std::time::Duration;
1716

18-
use api::v1::meta::MailboxMessage;
1917
use common_meta::instruction::{
2018
GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
2119
};
2220
use common_meta::peer::Peer;
2321
use common_telemetry::{error, info, warn};
24-
use snafu::ResultExt;
2522
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
2623

2724
use crate::error::{self, Result};
2825
use crate::gc::scheduler::GcScheduler;
29-
use crate::handler::HeartbeatMailbox;
30-
use crate::service::mailbox::Channel;
3126

3227
impl GcScheduler {
33-
/// Send an instruction to a datanode and wait for the reply.
34-
async fn send_instruction(
35-
&self,
36-
peer: &Peer,
37-
instruction: Instruction,
38-
description: &str,
39-
timeout: Duration,
40-
) -> Result<InstructionReply> {
41-
let msg = MailboxMessage::json_message(
42-
&format!("{}: {}", description, instruction),
43-
&format!("Metasrv@{}", self.server_addr),
44-
&format!("Datanode-{}@{}", peer.id, peer.addr),
45-
common_time::util::current_time_millis(),
46-
&instruction,
47-
)
48-
.with_context(|_| error::SerializeToJsonSnafu {
49-
input: instruction.to_string(),
50-
})?;
51-
52-
let mailbox_rx = self
53-
.mailbox
54-
.send(&Channel::Datanode(peer.id), msg, timeout)
55-
.await?;
56-
57-
match mailbox_rx.await {
58-
Ok(reply_msg) => {
59-
let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
60-
Ok(reply)
61-
}
62-
Err(e) => {
63-
error!(
64-
"Failed to receive reply from datanode {} for {}: {}",
65-
peer, description, e
66-
);
67-
Err(e)
68-
}
69-
}
70-
}
71-
7228
/// Send GetFileRefs instruction to a datanode for specified regions.
7329
pub(crate) async fn send_get_file_refs_instruction(
7430
&self,
@@ -86,6 +42,7 @@ impl GcScheduler {
8642
});
8743

8844
let reply = self
45+
.ctx
8946
.send_instruction(
9047
peer,
9148
instruction,
@@ -140,6 +97,7 @@ impl GcScheduler {
14097
});
14198

14299
let reply = self
100+
.ctx
143101
.send_instruction(&peer, instruction, "GC region", self.config.mailbox_timeout)
144102
.await?;
145103

src/meta-srv/src/gc/scheduler.rs

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::collections::HashMap;
1616
use std::sync::Arc;
1717
use std::time::Instant;
1818

19-
use common_meta::datanode::RegionStat;
2019
use common_meta::key::TableMetadataManagerRef;
2120
use common_telemetry::{error, info};
2221
use futures::stream::{FuturesUnordered, StreamExt};
@@ -27,6 +26,7 @@ use crate::cluster::MetaPeerClientRef;
2726
use crate::define_ticker;
2827
use crate::error::Result;
2928
use crate::gc::candidate::GcCandidate;
29+
use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
3030
use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
3131
use crate::gc::tracker::RegionGcTracker;
3232
use crate::service::mailbox::MailboxRef;
@@ -51,14 +51,7 @@ define_ticker!(
5151

5252
/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
5353
pub struct GcScheduler {
54-
/// The metadata manager.
55-
pub(crate) table_metadata_manager: TableMetadataManagerRef,
56-
/// For getting `RegionStats`.
57-
pub(crate) meta_peer_client: MetaPeerClientRef,
58-
/// The mailbox to send messages.
59-
pub(crate) mailbox: MailboxRef,
60-
/// The server address.
61-
pub(crate) server_addr: String,
54+
pub(crate) ctx: Arc<dyn SchedulerCtx>,
6255
/// The receiver of events.
6356
pub(crate) receiver: Receiver<Event>,
6457
/// GC configuration.
@@ -101,10 +94,12 @@ impl GcScheduler {
10194
let (tx, rx) = Self::channel();
10295
let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
10396
let gc_trigger = Self {
104-
table_metadata_manager,
105-
meta_peer_client,
106-
mailbox,
107-
server_addr,
97+
ctx: Arc::new(DefaultGcSchedulerCtx {
98+
table_metadata_manager,
99+
meta_peer_client,
100+
mailbox,
101+
server_addr,
102+
}),
108103
receiver: rx,
109104
config,
110105
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
@@ -149,29 +144,6 @@ impl GcScheduler {
149144
info!("Finished gc trigger");
150145
}
151146

152-
pub(crate) async fn get_table_to_region_stats(
153-
&self,
154-
) -> Result<HashMap<TableId, Vec<RegionStat>>> {
155-
let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
156-
let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
157-
for (_dn_id, stats) in dn_stats {
158-
let mut stats = stats.stats;
159-
stats.sort_by_key(|s| s.timestamp_millis);
160-
161-
let Some(latest_stat) = stats.last().cloned() else {
162-
continue;
163-
};
164-
165-
for region_stat in latest_stat.region_stats {
166-
table_to_region_stats
167-
.entry(region_stat.id.table_id())
168-
.or_default()
169-
.push(region_stat);
170-
}
171-
}
172-
Ok(table_to_region_stats)
173-
}
174-
175147
/// Process multiple tables concurrently with limited parallelism.
176148
pub(crate) async fn process_tables_concurrently(
177149
&self,

src/meta-srv/src/gc/tracker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl GcScheduler {
5858
let cleanup_start = Instant::now();
5959

6060
// Get all current region IDs from table routes
61-
let table_to_region_stats = self.get_table_to_region_stats().await?;
61+
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
6262
let mut current_regions = HashSet::new();
6363
for region_stats in table_to_region_stats.values() {
6464
for region_stat in region_stats {

0 commit comments

Comments
 (0)