Skip to content

Commit 4400b66

Browse files
committed
feat: introduce EnterStagingRequest for region engine
Signed-off-by: WenyXu <[email protected]>
1 parent 06a2498 commit 4400b66

File tree

24 files changed

+732
-153
lines changed

24 files changed

+732
-153
lines changed

src/datanode/src/region_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1200,7 +1200,8 @@ impl RegionServerInner {
12001200
| RegionRequest::Flush(_)
12011201
| RegionRequest::Compact(_)
12021202
| RegionRequest::Truncate(_)
1203-
| RegionRequest::BuildIndex(_) => RegionChange::None,
1203+
| RegionRequest::BuildIndex(_)
1204+
| RegionRequest::EnterStaging(_) => RegionChange::None,
12041205
RegionRequest::Catchup(_) => RegionChange::Catchup,
12051206
};
12061207

src/metric-engine/src/engine.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,18 @@ impl RegionEngine for MetricEngine {
208208
let mut extension_return_value = HashMap::new();
209209

210210
let result = match request {
211+
RegionRequest::EnterStaging(_) => {
212+
if self.inner.is_physical_region(region_id) {
213+
self.inner
214+
.mito
215+
.handle_request(region_id, request)
216+
.await
217+
.context(error::MitoEnterStagingOperationSnafu)
218+
.map(|response| response.affected_rows)
219+
} else {
220+
UnsupportedRegionRequestSnafu { request }.fail()
221+
}
222+
}
211223
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
212224
RegionRequest::Create(create) => {
213225
self.inner

src/metric-engine/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ pub enum Error {
156156
location: Location,
157157
},
158158

159+
#[snafu(display("Mito enter staging operation fails"))]
160+
MitoEnterStagingOperation {
161+
source: BoxedError,
162+
#[snafu(implicit)]
163+
location: Location,
164+
},
165+
159166
#[snafu(display("Failed to collect record batch stream"))]
160167
CollectRecordBatchStream {
161168
source: common_recordbatch::error::Error,
@@ -352,6 +359,7 @@ impl ErrorExt for Error {
352359
| MitoWriteOperation { source, .. }
353360
| MitoFlushOperation { source, .. }
354361
| MitoSyncOperation { source, .. }
362+
| MitoEnterStagingOperation { source, .. }
355363
| BatchOpenMitoRegion { source, .. }
356364
| BatchCatchupMitoRegion { source, .. } => source.status_code(),
357365

src/mito2/src/compaction/compactor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ impl Compactor for DefaultCompactor {
500500
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
501501
compaction_region
502502
.manifest_ctx
503-
.update_manifest(RegionLeaderState::Writable, action_list)
503+
.update_manifest(RegionLeaderState::Writable, action_list, false)
504504
.await?;
505505

506506
Ok(edit)

src/mito2/src/compaction/task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl CompactionTaskImpl {
117117
};
118118
if let Err(e) = compaction_region
119119
.manifest_ctx
120-
.update_manifest(current_region_state, action_list)
120+
.update_manifest(current_region_state, action_list, false)
121121
.await
122122
{
123123
warn!(

src/mito2/src/engine/alter_test.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
901901
check_ttl(&engine, &Duration::from_secs(500));
902902
}
903903

904-
#[tokio::test]
904+
#[tokio::test(flavor = "multi_thread")]
905905
async fn test_write_stall_on_altering() {
906906
common_telemetry::init_default_ut_logging();
907907

@@ -952,6 +952,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
952952
.await
953953
.unwrap();
954954
});
955+
// Make sure the loop is handling the alter request.
956+
tokio::time::sleep(Duration::from_millis(100)).await;
955957

956958
let column_schemas_cloned = column_schemas.clone();
957959
let engine_cloned = engine.clone();
@@ -962,6 +964,8 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
962964
};
963965
put_rows(&engine_cloned, region_id, rows).await;
964966
});
967+
// Make sure the loop is handling the put request.
968+
tokio::time::sleep(Duration::from_millis(100)).await;
965969

966970
listener.wake_notify();
967971
alter_job.await.unwrap();

src/mito2/src/engine/listener.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ pub trait EventListener: Send + Sync {
7474
/// Notifies the listener that region starts to send a region change result to worker.
7575
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
7676

77+
/// Notifies the listener that region starts to send a enter staging result to worker.
78+
async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {}
79+
7780
/// Notifies the listener that the index build task is executed successfully.
7881
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}
7982

@@ -307,6 +310,37 @@ impl EventListener for NotifyRegionChangeResultListener {
307310
region_id
308311
);
309312
self.notify.notified().await;
313+
info!(
314+
"Continue to sending region change result for region {}",
315+
region_id
316+
);
317+
}
318+
}
319+
320+
#[derive(Default)]
321+
pub struct NotifyEnterStagingResultListener {
322+
notify: Notify,
323+
}
324+
325+
impl NotifyEnterStagingResultListener {
326+
/// Continue to sending enter staging result.
327+
pub fn wake_notify(&self) {
328+
self.notify.notify_one();
329+
}
330+
}
331+
332+
#[async_trait]
333+
impl EventListener for NotifyEnterStagingResultListener {
334+
async fn on_enter_staging_result_begin(&self, region_id: RegionId) {
335+
info!(
336+
"Wait on notify to start notify enter staging result for region {}",
337+
region_id
338+
);
339+
self.notify.notified().await;
340+
info!(
341+
"Continue to sending enter staging result for region {}",
342+
region_id
343+
);
310344
}
311345
}
312346

0 commit comments

Comments
 (0)