Skip to content

Commit 0ba1af0

Browse files
authored
Keep data in mem for local query (#381)
When using the disk writer, keep a copy of record in a mutable memory writer. This allows querying hot data of past minute by cloning the records and provide to query table. The memory is discarded when finalise is called at the end of minute.
1 parent d5769c3 commit 0ba1af0

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

server/src/event/writer.rs

+14-11
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
4444

4545
pub enum StreamWriter {
4646
Mem(InMemWriter),
47-
Disk(FileWriter),
47+
Disk(FileWriter, InMemWriter),
4848
}
4949

5050
impl StreamWriter {
@@ -58,8 +58,9 @@ impl StreamWriter {
5858
StreamWriter::Mem(mem) => {
5959
mem.push(rb);
6060
}
61-
StreamWriter::Disk(disk) => {
61+
StreamWriter::Disk(disk, mem) => {
6262
disk.push(stream_name, schema_key, &rb)?;
63+
mem.push(rb);
6364
}
6465
}
6566
Ok(())
@@ -112,7 +113,7 @@ impl WriterTable {
112113
let mut writer = if CONFIG.parseable.in_mem_ingestion {
113114
StreamWriter::Mem(InMemWriter::default())
114115
} else {
115-
StreamWriter::Disk(FileWriter::default())
116+
StreamWriter::Disk(FileWriter::default(), InMemWriter::default())
116117
};
117118

118119
writer.push(stream_name, schema_key, record)?;
@@ -146,7 +147,7 @@ impl WriterTable {
146147
buf: rb,
147148
});
148149
}
149-
StreamWriter::Disk(disk) => disk.close_all(),
150+
StreamWriter::Disk(disk, _) => disk.close_all(),
150151
}
151152
}
152153
}
@@ -155,13 +156,15 @@ impl WriterTable {
155156
let hashmap_guard = self.read().unwrap();
156157
let (writer, context) = hashmap_guard.get(stream_name)?;
157158
let writer = writer.lock().unwrap();
158-
match &*writer {
159-
StreamWriter::Mem(mem) => Some(ReadBuf {
160-
time: context.time,
161-
buf: mem.recordbatch_cloned(),
162-
}),
163-
StreamWriter::Disk(_) => None,
164-
}
159+
let mem = match &*writer {
160+
StreamWriter::Mem(mem) => mem,
161+
StreamWriter::Disk(_, mem) => mem,
162+
};
163+
164+
Some(ReadBuf {
165+
time: context.time,
166+
buf: mem.recordbatch_cloned(),
167+
})
165168
}
166169
}
167170

0 commit comments

Comments
 (0)