Skip to content

Commit 1ee280d

Browse files
committed
wip: handle file ref change
Signed-off-by: discord9 <[email protected]>
1 parent 41ef765 commit 1ee280d

File tree

11 files changed

+184
-52
lines changed

11 files changed

+184
-52
lines changed

src/common/meta/src/instruction.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
1717
use std::time::Duration;
1818

1919
use serde::{Deserialize, Serialize};
20-
use store_api::storage::{RegionId, RegionNumber, TableFileRefsManifest};
20+
use store_api::storage::{FileRefsManifest, RegionId, RegionNumber};
2121
use strum::Display;
2222
use table::metadata::TableId;
2323
use table::table_name::TableName;
@@ -423,7 +423,7 @@ pub struct GcRegion {
423423
/// The region ID to perform GC on.
424424
pub region_id: RegionId,
425425
/// The file references manifest containing temporary file references.
426-
pub file_refs_manifest: TableFileRefsManifest,
426+
pub file_refs_manifest: FileRefsManifest,
427427
}
428428

429429
impl Display for GcRegion {
@@ -441,7 +441,7 @@ impl Display for GcRegion {
441441
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
442442
pub struct GetFileRefsReply {
443443
/// The file references manifest.
444-
pub file_refs_manifest: TableFileRefsManifest,
444+
pub file_refs_manifest: FileRefsManifest,
445445
/// Whether the operation was successful.
446446
pub success: bool,
447447
/// Error message if any.

src/datanode/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,13 @@ pub enum Error {
322322
source: mito2::error::Error,
323323
},
324324

325+
#[snafu(display("Failed to list tmp ref files from metadata"))]
326+
ListTmpRefFiles {
327+
#[snafu(implicit)]
328+
location: Location,
329+
source: mito2::error::Error,
330+
},
331+
325332
#[snafu(display("Failed to serialize options to TOML"))]
326333
TomlFormat {
327334
#[snafu(implicit)]
@@ -461,6 +468,7 @@ impl ErrorExt for Error {
461468
BuildMitoEngine { source, .. } => source.status_code(),
462469
BuildMetricEngine { source, .. } => source.status_code(),
463470
ListStorageSsts { source, .. } => source.status_code(),
471+
ListTmpRefFiles { source, .. } => source.status_code(),
464472
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
465473
StatusCode::RegionBusy
466474
}

src/datanode/src/heartbeat/handler.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use store_api::storage::RegionId;
2626

2727
mod close_region;
2828
mod downgrade_region;
29+
mod file_ref;
2930
mod flush_region;
3031
mod open_region;
3132
mod upgrade_region;
@@ -102,7 +103,10 @@ impl RegionHeartbeatResponseHandler {
102103
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
103104
handler_context.handle_flush_regions_instruction(flush_regions)
104105
})),
105-
_ => todo!(),
106+
Instruction::GetFileRefs(get_file_refs) => Ok(Box::new(move |handler_context| {
107+
Box::pin(handler_context.handle_get_file_refs_instruction(get_file_refs))
108+
})),
109+
Instruction::GcRegion(_) => InvalidHeartbeatResponseSnafu.fail(),
106110
}
107111
}
108112
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::{HashMap, HashSet};
16+
17+
use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply};
18+
use mito2::engine::MitoEngine;
19+
use snafu::ResultExt as _;
20+
use store_api::storage::{FileRefsManifest, RegionId, TableId};
21+
22+
use crate::error::{ListTmpRefFilesSnafu, Result};
23+
use crate::heartbeat::handler::HandlerContext;
24+
impl HandlerContext {
25+
/// Handles GetFileRefs instruction by getting file references from MitoEngine.
26+
pub(crate) async fn handle_get_file_refs_instruction(
27+
self,
28+
get_file_refs: GetFileRefs,
29+
) -> Option<InstructionReply> {
30+
let region_server = self.region_server;
31+
32+
// Get the MitoEngine
33+
let Some(mito_engine) = region_server.mito_engine() else {
34+
return Some(InstructionReply::GetFileRefs(GetFileRefsReply {
35+
file_refs_manifest: FileRefsManifest::default(),
36+
success: false,
37+
error: Some("MitoEngine not found".to_string()),
38+
}));
39+
};
40+
41+
match mito_engine
42+
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
43+
.await
44+
{
45+
Ok(all_file_refs) => {
46+
// Return the file references
47+
Some(InstructionReply::GetFileRefs(GetFileRefsReply {
48+
file_refs_manifest: all_file_refs,
49+
success: true,
50+
error: None,
51+
}))
52+
}
53+
Err(e) => Some(InstructionReply::GetFileRefs(GetFileRefsReply {
54+
file_refs_manifest: FileRefsManifest::default(),
55+
success: false,
56+
error: Some(format!("Failed to get file refs: {}", e)),
57+
})),
58+
}
59+
}
60+
}

src/datanode/src/region_server.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ impl RegionServer {
158158
}
159159
}
160160

161+
/// Gets the MitoEngine if it's registered.
162+
pub fn mito_engine(&self) -> Option<MitoEngine> {
163+
self.inner.mito_engine.read().unwrap().clone()
164+
}
165+
161166
#[tracing::instrument(skip_all)]
162167
pub async fn handle_batch_open_requests(
163168
&self,
@@ -676,14 +681,14 @@ struct RegionServerInner {
676681
runtime: Runtime,
677682
event_listener: RegionServerEventListenerRef,
678683
table_provider_factory: TableProviderFactoryRef,
679-
// The number of queries allowed to be executed at the same time.
680-
// Act as last line of defense on datanode to prevent query overloading.
684+
/// The number of queries allowed to be executed at the same time.
685+
/// Act as last line of defense on datanode to prevent query overloading.
681686
parallelism: Option<RegionServerParallelism>,
682-
// The topic stats reporter.
687+
/// The topic stats reporter.
683688
topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
684-
// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
685-
// server with a concrete engine; acceptable for now to fetch Mito-specific
686-
// info (e.g., list SSTs). Consider a diagnostics trait later.
689+
/// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
690+
/// server with a concrete engine; acceptable for now to fetch Mito-specific
691+
/// info (e.g., list SSTs). Consider a diagnostics trait later.
687692
mito_engine: RwLock<Option<MitoEngine>>,
688693
}
689694

src/meta-srv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
6060
itertools.workspace = true
6161
lazy_static.workspace = true
6262
once_cell.workspace = true
63+
ordered-float.workspace = true
6364
parking_lot.workspace = true
6465
prometheus.workspace = true
6566
prost.workspace = true

src/meta-srv/src/gc_trigger.rs

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ use common_meta::rpc::router::RegionRoute;
2828
use common_telemetry::{debug, error, info, warn};
2929
use futures::TryStreamExt;
3030
use futures::stream::{FuturesUnordered, StreamExt};
31+
use ordered_float::OrderedFloat;
3132
use snafu::{OptionExt as _, ResultExt};
32-
use store_api::storage::{RegionId, TableFileRefsManifest};
33+
use store_api::storage::{FileRefsManifest, RegionId};
3334
use table::metadata::TableId;
3435
use tokio::sync::mpsc::{Receiver, Sender};
3536
use tokio::time::sleep;
@@ -63,6 +64,8 @@ pub struct GcConfig {
6364
pub file_removal_rate_weight: f64,
6465
/// Cooldown period between GC operations on the same region.
6566
pub gc_cooldown_period: Duration,
67+
/// Maximum number of regions to select for GC per table.
68+
pub regions_per_table_threshold: usize,
6669
}
6770

6871
impl Default for GcConfig {
@@ -75,6 +78,7 @@ impl Default for GcConfig {
7578
sst_count_weight: 1.0,
7679
file_removal_rate_weight: 0.5,
7780
gc_cooldown_period: Duration::from_secs(60 * 30), // 30 minutes
81+
regions_per_table_threshold: 20, // Select top 20 regions per table
7882
}
7983
}
8084
}
@@ -83,21 +87,21 @@ impl Default for GcConfig {
8387
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
8488
struct GcCandidate {
8589
region_id: RegionId,
86-
score: u64, // Changed to u64 for Hash/Eq implementation
90+
score: OrderedFloat<f64>,
8791
region_stat: RegionStat,
8892
}
8993

9094
impl GcCandidate {
9195
fn new(region_id: RegionId, score: f64, region_stat: RegionStat) -> Self {
9296
Self {
9397
region_id,
94-
score: (score * 1000.0) as u64, // Convert to u64 for hashing
98+
score: OrderedFloat(score),
9599
region_stat,
96100
}
97101
}
98102

99103
fn score_f64(&self) -> f64 {
100-
self.score as f64 / 1000.0
104+
self.score.into_inner()
101105
}
102106
}
103107

@@ -247,7 +251,7 @@ impl GcTrigger {
247251
let gc_tracker = self.region_gc_tracker.lock().await;
248252

249253
for (table_id, region_stats) in table_to_region_stats {
250-
let mut candidates = HashSet::new();
254+
let mut candidates = Vec::new();
251255

252256
for region_stat in region_stats {
253257
// Skip regions that are too small
@@ -267,17 +271,25 @@ impl GcTrigger {
267271

268272
// Only consider regions with a meaningful score
269273
if score > 0.0 {
270-
candidates.insert(GcCandidate::new(region_stat.id, score, region_stat.clone()));
274+
candidates.push(GcCandidate::new(region_stat.id, score, region_stat.clone()));
271275
}
272276
}
273277

274-
if !candidates.is_empty() {
278+
// Sort candidates by score in descending order and take top N
279+
candidates.sort_by(|a, b| b.score.cmp(&a.score));
280+
let top_candidates: HashSet<GcCandidate> = candidates
281+
.into_iter()
282+
.take(self.config.regions_per_table_threshold)
283+
.collect();
284+
285+
if !top_candidates.is_empty() {
275286
info!(
276-
"Selected {} GC candidates for table {}",
277-
candidates.len(),
278-
table_id
287+
"Selected {} GC candidates for table {} (top {} out of all qualified)",
288+
top_candidates.len(),
289+
table_id,
290+
self.config.regions_per_table_threshold
279291
);
280-
table_candidates.insert(*table_id, candidates);
292+
table_candidates.insert(*table_id, top_candidates);
281293
}
282294
}
283295

@@ -388,9 +400,38 @@ impl GcTrigger {
388400
.get_file_references(&related_region_ids, &table_peer)
389401
.await?;
390402

391-
// Step 4: Process each candidate region with retry logic
392-
let mut successful_regions = 0;
403+
// Step 4: Filter out candidates that don't have file references available
404+
let total_candidates = candidates.len();
405+
let mut valid_candidates = Vec::new();
393406
for candidate in candidates {
407+
// Check if we have file references for this region
408+
if let Some(region_route) = table_peer
409+
.region_routes
410+
.iter()
411+
.find(|r| r.region.id == candidate.region_id)
412+
{
413+
if let Some(peer) = &region_route.leader_peer {
414+
// Check if this peer's file references were successfully obtained
415+
if file_refs_manifest
416+
.manifest_version
417+
.contains_key(&candidate.region_id)
418+
{
419+
valid_candidates.push(candidate);
420+
} else {
421+
warn!(
422+
"Skipping region {} due to missing file references (datanode {} may be unavailable)",
423+
candidate.region_id, peer
424+
);
425+
}
426+
}
427+
}
428+
}
429+
430+
// Step 5: Process each valid candidate region with retry logic
431+
let valid_candidates_count = valid_candidates.len();
432+
let mut successful_regions = 0;
433+
434+
for candidate in valid_candidates {
394435
let region_id = candidate.region_id;
395436
match self
396437
.process_region_gc_with_retry(candidate, &file_refs_manifest, &table_peer)
@@ -409,10 +450,11 @@ impl GcTrigger {
409450
}
410451

411452
info!(
412-
"Completed GC for table {}: {}/{} regions successful",
453+
"Completed GC for table {}: {}/{} regions successful ({} skipped due to missing file references)",
413454
table_id,
414455
successful_regions,
415-
candidate_region_ids.len()
456+
valid_candidates_count,
457+
total_candidates - valid_candidates_count
416458
);
417459

418460
Ok(successful_regions)
@@ -434,7 +476,7 @@ impl GcTrigger {
434476
&self,
435477
region_ids: &[RegionId],
436478
table_peer: &PhysicalTableRouteValue,
437-
) -> Result<TableFileRefsManifest> {
479+
) -> Result<FileRefsManifest> {
438480
info!("Getting file references for {} regions", region_ids.len());
439481

440482
// Group regions by datanode to minimize RPC calls
@@ -466,13 +508,17 @@ impl GcTrigger {
466508
all_manifest_versions.extend(manifest.manifest_version);
467509
}
468510
Err(e) => {
469-
error!("Failed to get file refs from datanode {}: {}", peer, e);
470-
return Err(e);
511+
warn!(
512+
"Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
513+
peer, e
514+
);
515+
// Continue processing other datanodes instead of failing the entire operation
516+
continue;
471517
}
472518
}
473519
}
474520

475-
Ok(TableFileRefsManifest {
521+
Ok(FileRefsManifest {
476522
file_refs: all_file_refs,
477523
manifest_version: all_manifest_versions,
478524
})
@@ -482,7 +528,7 @@ impl GcTrigger {
482528
async fn process_region_gc_with_retry(
483529
&self,
484530
candidate: GcCandidate,
485-
file_refs_manifest: &TableFileRefsManifest,
531+
file_refs_manifest: &FileRefsManifest,
486532
table_peer: &PhysicalTableRouteValue,
487533
) -> Result<()> {
488534
let region_id = candidate.region_id;
@@ -548,7 +594,7 @@ impl GcTrigger {
548594
&self,
549595
peer: &Peer,
550596
region_ids: &[RegionId],
551-
) -> Result<TableFileRefsManifest> {
597+
) -> Result<FileRefsManifest> {
552598
info!(
553599
"Sending GetFileRefs instruction to datanode {} for {} regions",
554600
peer,
@@ -618,7 +664,7 @@ impl GcTrigger {
618664
&self,
619665
peer: Peer,
620666
region_id: RegionId,
621-
file_refs_manifest: &TableFileRefsManifest,
667+
file_refs_manifest: &FileRefsManifest,
622668
) -> Result<()> {
623669
info!(
624670
"Sending GC instruction to datanode {} for region {}",

0 commit comments

Comments
 (0)