Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics manager cache #1625

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
bug fix
zealchen committed Feb 27, 2025
commit e311fb64be0f68112d3d9559e280f93d7325fb76
14 changes: 8 additions & 6 deletions src/metric_engine/src/metric/mod.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,10 @@ use arrow::{
datatypes::{DataType, Field, Schema, ToByteSlice},
};
use chrono::{Datelike, Timelike, Utc};
use horaedb_storage::storage::{TimeMergeStorageRef, WriteRequest};
use horaedb_storage::{
storage::{TimeMergeStorageRef, WriteRequest},
types::Timestamp,
};
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
@@ -246,16 +249,15 @@ impl Inner {
}

async fn batch_write_metrics(batch_tasks: Vec<Task>, storage: TimeMergeStorageRef) {
let mut start_ts: i64 = 0;
let mut end_ts: i64 = 0;
let arrays: Vec<ArrayRef> = {
let mut metric_name_builder = BinaryBuilder::new();
let mut metric_id_builder = UInt64Builder::new();
let mut field_name_builder = BinaryBuilder::new();
let mut field_id_builder = UInt64Builder::new();
let mut field_type_builder = UInt8Builder::new();
let mut field_duration_builder = Int64Builder::new();

let mut start_ts: i64 = 0;
let mut end_ts: i64 = 0;
let task_len = batch_tasks.len();

batch_tasks
@@ -292,7 +294,7 @@ impl Inner {
storage
.write(WriteRequest {
batch,
time_range: (0..10).into(),
time_range: (Timestamp(start_ts)..Timestamp(end_ts)).into(),
enable_check: true,
})
.await
@@ -345,7 +347,7 @@ impl Inner {
debug!("reach max wait time.");
break;
},
finished = Inner::batching(&mut batch_tasks, &mut receiver, max_batch_size) => {
finished = Self::batching(&mut batch_tasks, &mut receiver, max_batch_size) => {
debug!("get one task");
if finished {
debug!("batching finished.");