Skip to content

Commit bfe619f

Browse files
committed
refactor: support multiple rows per event in event recorder
Signed-off-by: WenyXu <[email protected]>
1 parent 32168e8 commit bfe619f

File tree

4 files changed

+141
-64
lines changed

4 files changed

+141
-64
lines changed

src/common/event-recorder/src/recorder.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ pub trait Event: Send + Sync + Debug {
9797
vec![]
9898
}
9999

100-
/// Add the extra row to the event with the default row.
101-
fn extra_row(&self) -> Result<Row> {
102-
Ok(Row { values: vec![] })
100+
/// Add the extra rows to the event with the default row.
101+
fn extra_rows(&self) -> Result<Vec<Row>> {
102+
Ok(vec![Row { values: vec![] }])
103103
}
104104

105105
/// Returns the event as any type.
@@ -159,15 +159,17 @@ pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsert
159159

160160
let mut rows: Vec<Row> = Vec::with_capacity(events.len());
161161
for event in events {
162-
let extra_row = event.extra_row()?;
163-
let mut values = Vec::with_capacity(3 + extra_row.values.len());
164-
values.extend([
165-
ValueData::StringValue(event.event_type().to_string()).into(),
166-
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
167-
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
168-
]);
169-
values.extend(extra_row.values);
170-
rows.push(Row { values });
162+
let extra_rows = event.extra_rows()?;
163+
for extra_row in extra_rows {
164+
let mut values = Vec::with_capacity(3 + extra_row.values.len());
165+
values.extend([
166+
ValueData::StringValue(event.event_type().to_string()).into(),
167+
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
168+
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
169+
]);
170+
values.extend(extra_row.values);
171+
rows.push(Row { values });
172+
}
171173
}
172174

173175
Ok(RowInsertRequests {

src/common/frontend/src/slow_query_event.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ impl Event for SlowQueryEvent {
107107
]
108108
}
109109

110-
fn extra_row(&self) -> Result<Row> {
111-
Ok(Row {
110+
fn extra_rows(&self) -> Result<Vec<Row>> {
111+
Ok(vec![Row {
112112
values: vec![
113113
ValueData::U64Value(self.cost).into(),
114114
ValueData::U64Value(self.threshold).into(),
@@ -119,7 +119,7 @@ impl Event for SlowQueryEvent {
119119
ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(),
120120
ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(),
121121
],
122-
})
122+
}])
123123
}
124124

125125
fn json_payload(&self) -> Result<String> {

src/common/procedure/src/event.rs

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,95 @@ impl Event for ProcedureEvent {
9292
schema
9393
}
9494

95-
fn extra_row(&self) -> Result<Row> {
96-
let error_str = match &self.state {
97-
ProcedureState::Failed { error } => format!("{:?}", error),
98-
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
99-
ProcedureState::RollingBack { error } => format!("{:?}", error),
100-
ProcedureState::Retrying { error } => format!("{:?}", error),
101-
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
102-
_ => "".to_string(),
103-
};
104-
let mut row = vec![
105-
ValueData::StringValue(self.procedure_id.to_string()).into(),
106-
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
107-
ValueData::StringValue(error_str).into(),
108-
];
109-
row.append(&mut self.internal_event.extra_row()?.values);
110-
Ok(Row { values: row })
95+
fn extra_rows(&self) -> Result<Vec<Row>> {
96+
let mut internal_event_extra_rows = self.internal_event.extra_rows()?;
97+
let mut rows = Vec::with_capacity(internal_event_extra_rows.len());
98+
for internal_event_extra_row in internal_event_extra_rows.iter_mut() {
99+
let error_str = match &self.state {
100+
ProcedureState::Failed { error } => format!("{:?}", error),
101+
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
102+
ProcedureState::RollingBack { error } => format!("{:?}", error),
103+
ProcedureState::Retrying { error } => format!("{:?}", error),
104+
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
105+
_ => "".to_string(),
106+
};
107+
let mut values = vec![
108+
ValueData::StringValue(self.procedure_id.to_string()).into(),
109+
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
110+
ValueData::StringValue(error_str).into(),
111+
];
112+
values.append(&mut internal_event_extra_row.values);
113+
rows.push(Row { values });
114+
}
115+
116+
Ok(rows)
111117
}
112118

113119
fn as_any(&self) -> &dyn Any {
114120
self
115121
}
116122
}
123+
124+
#[cfg(test)]
125+
mod tests {
126+
use api::v1::value::ValueData;
127+
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
128+
use common_event_recorder::Event;
129+
130+
use crate::{ProcedureEvent, ProcedureId, ProcedureState};
131+
132+
#[derive(Debug)]
133+
struct TestEvent;
134+
135+
impl Event for TestEvent {
136+
fn event_type(&self) -> &str {
137+
"test_event"
138+
}
139+
140+
fn extra_schema(&self) -> Vec<ColumnSchema> {
141+
vec![ColumnSchema {
142+
column_name: "test_event_column".to_string(),
143+
datatype: ColumnDataType::String.into(),
144+
semantic_type: SemanticType::Field.into(),
145+
..Default::default()
146+
}]
147+
}
148+
149+
fn extra_rows(&self) -> common_event_recorder::error::Result<Vec<Row>> {
150+
Ok(vec![
151+
Row {
152+
values: vec![ValueData::StringValue("test_event1".to_string()).into()],
153+
},
154+
Row {
155+
values: vec![ValueData::StringValue("test_event2".to_string()).into()],
156+
},
157+
])
158+
}
159+
160+
fn as_any(&self) -> &dyn std::any::Any {
161+
self
162+
}
163+
}
164+
165+
#[test]
166+
fn test_procedure_event_extra_rows() {
167+
let procedure_event = ProcedureEvent::new(
168+
ProcedureId::random(),
169+
Box::new(TestEvent {}),
170+
ProcedureState::Running,
171+
);
172+
173+
let procedure_event_extra_rows = procedure_event.extra_rows().unwrap();
174+
assert_eq!(procedure_event_extra_rows.len(), 2);
175+
assert_eq!(procedure_event_extra_rows[0].values.len(), 4);
176+
assert_eq!(
177+
procedure_event_extra_rows[0].values[3],
178+
ValueData::StringValue("test_event1".to_string()).into()
179+
);
180+
assert_eq!(procedure_event_extra_rows[1].values.len(), 4);
181+
assert_eq!(
182+
procedure_event_extra_rows[1].values[3],
183+
ValueData::StringValue("test_event2".to_string()).into()
184+
);
185+
}
186+
}

src/meta-srv/src/events/region_migration_event.rs

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use common_event_recorder::Event;
2121
use common_event_recorder::error::{Result, SerializeEventSnafu};
2222
use serde::Serialize;
2323
use snafu::ResultExt;
24-
use store_api::storage::{RegionId, TableId};
24+
use store_api::storage::RegionId;
2525

2626
use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
2727

@@ -37,37 +37,34 @@ pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_nod
3737
pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
3838

3939
/// RegionMigrationEvent is the event of region migration.
40-
#[derive(Debug, Serialize)]
40+
#[derive(Debug)]
4141
pub(crate) struct RegionMigrationEvent {
42-
#[serde(skip)]
43-
region_id: RegionId,
44-
#[serde(skip)]
45-
table_id: TableId,
46-
#[serde(skip)]
47-
region_number: u32,
48-
#[serde(skip)]
42+
/// The region ids of the region migration.
43+
region_ids: Vec<RegionId>,
44+
/// The trigger reason of the region migration.
4945
trigger_reason: RegionMigrationTriggerReason,
50-
#[serde(skip)]
46+
/// The source node id of the region migration.
5147
src_node_id: u64,
52-
#[serde(skip)]
48+
/// The source peer address of the region migration.
5349
src_peer_addr: String,
54-
#[serde(skip)]
50+
/// The destination node id of the region migration.
5551
dst_node_id: u64,
56-
#[serde(skip)]
52+
/// The destination peer address of the region migration.
5753
dst_peer_addr: String,
54+
/// The timeout of the region migration.
55+
timeout: Duration,
56+
}
5857

59-
// The following fields will be serialized as the json payload.
58+
#[derive(Debug, Serialize)]
59+
struct Payload {
60+
#[serde(with = "humantime_serde")]
6061
timeout: Duration,
6162
}
6263

6364
impl RegionMigrationEvent {
6465
pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
65-
// FIXME(weny): handle multiple region ids.
66-
let region_id = ctx.region_ids[0];
6766
Self {
68-
region_id,
69-
table_id: region_id.table_id(),
70-
region_number: region_id.region_number(),
67+
region_ids: ctx.region_ids.clone(),
7168
trigger_reason: ctx.trigger_reason,
7269
src_node_id: ctx.from_peer.id,
7370
src_peer_addr: ctx.from_peer.addr.clone(),
@@ -136,23 +133,31 @@ impl Event for RegionMigrationEvent {
136133
]
137134
}
138135

139-
fn extra_row(&self) -> Result<Row> {
140-
Ok(Row {
141-
values: vec![
142-
ValueData::U64Value(self.region_id.as_u64()).into(),
143-
ValueData::U32Value(self.table_id).into(),
144-
ValueData::U32Value(self.region_number).into(),
145-
ValueData::StringValue(self.trigger_reason.to_string()).into(),
146-
ValueData::U64Value(self.src_node_id).into(),
147-
ValueData::StringValue(self.src_peer_addr.clone()).into(),
148-
ValueData::U64Value(self.dst_node_id).into(),
149-
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
150-
],
151-
})
136+
fn extra_rows(&self) -> Result<Vec<Row>> {
137+
let mut extra_rows = Vec::with_capacity(self.region_ids.len());
138+
for region_id in &self.region_ids {
139+
extra_rows.push(Row {
140+
values: vec![
141+
ValueData::U64Value(region_id.as_u64()).into(),
142+
ValueData::U32Value(region_id.table_id()).into(),
143+
ValueData::U32Value(region_id.region_number()).into(),
144+
ValueData::StringValue(self.trigger_reason.to_string()).into(),
145+
ValueData::U64Value(self.src_node_id).into(),
146+
ValueData::StringValue(self.src_peer_addr.clone()).into(),
147+
ValueData::U64Value(self.dst_node_id).into(),
148+
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
149+
],
150+
});
151+
}
152+
153+
Ok(extra_rows)
152154
}
153155

154156
fn json_payload(&self) -> Result<String> {
155-
serde_json::to_string(self).context(SerializeEventSnafu)
157+
serde_json::to_string(&Payload {
158+
timeout: self.timeout,
159+
})
160+
.context(SerializeEventSnafu)
156161
}
157162

158163
fn as_any(&self) -> &dyn Any {

0 commit comments

Comments
 (0)