Skip to content

Commit d0cb6df

Browse files
authored
[ENH] Make it so the rust log service can start without a dirty log. (#5537)
## Description of changes This is for read-only mode for DR. ## Test plan CI for the normal stuff, tested in tilt for the log. ## Migration plan N/A ## Observability plan N/A ## Documentation Changes N/A
1 parent 53dc52b commit d0cb6df

File tree

1 file changed

+81
-47
lines changed

1 file changed

+81
-47
lines changed

rust/log-service/src/lib.rs

Lines changed: 81 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl DirtyMarker {
522522
#[derive(Clone, Debug)]
523523
pub struct MarkDirty {
524524
collection_id: CollectionUuid,
525-
dirty_log: Arc<LogWriter>,
525+
dirty_log: Option<Arc<LogWriter>>,
526526
}
527527

528528
impl MarkDirty {
@@ -538,24 +538,29 @@ impl wal3::MarkDirty for MarkDirty {
538538
log_position: LogPosition,
539539
num_records: usize,
540540
) -> Result<(), wal3::Error> {
541-
let num_records = num_records as u64;
542-
let initial_insertion_epoch_us = SystemTime::now()
543-
.duration_since(SystemTime::UNIX_EPOCH)
544-
.map_err(|_| wal3::Error::Internal)?
545-
.as_micros() as u64;
546-
let dirty_marker = DirtyMarker::MarkDirty {
547-
collection_id: self.collection_id,
548-
log_position,
549-
num_records,
550-
reinsert_count: 0,
551-
initial_insertion_epoch_us,
552-
};
553-
let dirty_marker_json = serde_json::to_string(&dirty_marker).map_err(|err| {
554-
tracing::error!("Failed to serialize dirty marker: {}", err);
555-
wal3::Error::Internal
556-
})?;
557-
self.dirty_log.append(Vec::from(dirty_marker_json)).await?;
558-
Ok(())
541+
if let Some(dirty_log) = self.dirty_log.as_ref() {
542+
let num_records = num_records as u64;
543+
let initial_insertion_epoch_us = SystemTime::now()
544+
.duration_since(SystemTime::UNIX_EPOCH)
545+
.map_err(|_| wal3::Error::Internal)?
546+
.as_micros() as u64;
547+
let dirty_marker = DirtyMarker::MarkDirty {
548+
collection_id: self.collection_id,
549+
log_position,
550+
num_records,
551+
reinsert_count: 0,
552+
initial_insertion_epoch_us,
553+
};
554+
let dirty_marker_json = serde_json::to_string(&dirty_marker).map_err(|err| {
555+
tracing::error!("Failed to serialize dirty marker: {}", err);
556+
wal3::Error::Internal
557+
})?;
558+
dirty_log.append(Vec::from(dirty_marker_json)).await?;
559+
Ok(())
560+
} else {
561+
tracing::error!("asked to mark dirty with no dirty log");
562+
Err(wal3::Error::Internal)
563+
}
559564
}
560565
}
561566

@@ -572,7 +577,7 @@ pub struct LogServer {
572577
config: LogServerConfig,
573578
storage: Arc<Storage>,
574579
open_logs: Arc<StateHashTable<LogKey, LogStub>>,
575-
dirty_log: Arc<LogWriter>,
580+
dirty_log: Option<Arc<LogWriter>>,
576581
rolling_up: tokio::sync::Mutex<()>,
577582
backpressure: Mutex<Arc<HashSet<CollectionUuid>>>,
578583
need_to_compact: Mutex<HashMap<CollectionUuid, RollupPerCollection>>,
@@ -600,8 +605,18 @@ impl LogServer {
600605

601606
/// Verify that the service is not in read-only mode.
602607
fn ensure_write_mode(&self) -> Result<(), Status> {
603-
if self.config.is_read_only() {
604-
Err(Status::permission_denied("service is in read-only mode"))
608+
if self.dirty_log.is_none() {
609+
// NOTE(rescrv): This should NEVER happen in production.
610+
//
611+
// If it does happen, it is better to reject writes than to silently write data that
612+
// will never be accounted for by billing or compaction.
613+
Err(Status::permission_denied(
614+
"service is in read-only mode because it has no dirty log",
615+
))
616+
} else if self.config.is_read_only() {
617+
Err(Status::permission_denied(
618+
"service is in read-only mode because of operator configuration",
619+
))
605620
} else {
606621
Ok(())
607622
}
@@ -629,7 +644,7 @@ impl LogServer {
629644
let handle = self.open_logs.get_or_create_state(key);
630645
let mark_dirty = MarkDirty {
631646
collection_id,
632-
dirty_log: Arc::clone(&self.dirty_log),
647+
dirty_log: self.dirty_log.clone(),
633648
};
634649
// NOTE(rescrv): We use the writer and fall back to constructing a local reader in order
635650
// to force a read-repair of the collection when things partially fail.
@@ -720,7 +735,7 @@ impl LogServer {
720735
if allow_rollback {
721736
let mark_dirty = MarkDirty {
722737
collection_id,
723-
dirty_log: Arc::clone(&self.dirty_log),
738+
dirty_log: self.dirty_log.clone(),
724739
};
725740
let _ = mark_dirty
726741
.mark_dirty(LogPosition::from_offset(adjusted_log_offset as u64), 1usize)
@@ -805,7 +820,11 @@ impl LogServer {
805820
async fn roll_dirty_log(&self) -> Result<(), Error> {
806821
// Ensure at most one request at a time.
807822
let _guard = self.rolling_up.lock().await;
808-
let mut rollup = self.read_and_coalesce_dirty_log().await?;
823+
let Some(dirty_log) = self.dirty_log.as_ref() else {
824+
tracing::error!("roll dirty log called with no dirty log configured");
825+
return Err(Error::CouldNotGetDirtyLogReader);
826+
};
827+
let mut rollup = self.read_and_coalesce_dirty_log(dirty_log).await?;
809828
if rollup.rollups.is_empty() {
810829
tracing::info!("rollups is empty");
811830
let backpressure = vec![];
@@ -824,10 +843,10 @@ impl LogServer {
824843
.dirty_log_collections
825844
.record(collections as u64, &[]);
826845
self.enrich_dirty_log(&mut rollup.rollups).await?;
827-
self.save_dirty_log(rollup).await
846+
self.save_dirty_log(rollup, dirty_log).await
828847
}
829848

830-
async fn save_dirty_log(&self, mut rollup: Rollup) -> Result<(), Error> {
849+
async fn save_dirty_log(&self, mut rollup: Rollup, dirty_log: &LogWriter) -> Result<(), Error> {
831850
let mut markers = vec![];
832851
let mut backpressure = vec![];
833852
let mut total_uncompacted = 0;
@@ -849,12 +868,12 @@ impl LogServer {
849868
markers.push(serde_json::to_string(&DirtyMarker::Cleared).map(Vec::from)?);
850869
}
851870
let mut new_cursor = rollup.cursor.clone();
852-
match self.dirty_log.append_many(markers).await {
871+
match dirty_log.append_many(markers).await {
853872
Ok(_) | Err(wal3::Error::LogContentionDurable) => Ok(()),
854873
Err(err) => Err(err),
855874
}?;
856875
new_cursor.position = rollup.last_record_witnessed + 1u64;
857-
let Some(cursors) = self.dirty_log.cursors(CursorStoreOptions::default()) else {
876+
let Some(cursors) = dirty_log.cursors(CursorStoreOptions::default()) else {
858877
return Err(Error::CouldNotGetDirtyLogCursors);
859878
};
860879
tracing::info!(
@@ -879,11 +898,11 @@ impl LogServer {
879898
/// Read the entirety of a prefix of the dirty log.
880899
#[tracing::instrument(skip(self), err(Display))]
881900
#[allow(clippy::type_complexity)]
882-
async fn read_and_coalesce_dirty_log(&self) -> Result<Rollup, Error> {
883-
let Some(reader) = self.dirty_log.reader(LogReaderOptions::default()) else {
901+
async fn read_and_coalesce_dirty_log(&self, dirty_log: &LogWriter) -> Result<Rollup, Error> {
902+
let Some(reader) = dirty_log.reader(LogReaderOptions::default()) else {
884903
return Err(Error::CouldNotGetDirtyLogReader);
885904
};
886-
let Some(cursors) = self.dirty_log.cursors(CursorStoreOptions::default()) else {
905+
let Some(cursors) = dirty_log.cursors(CursorStoreOptions::default()) else {
887906
return Err(Error::CouldNotGetDirtyLogCursors);
888907
};
889908
let witness = cursors.load(&STABLE_PREFIX).await?;
@@ -1128,7 +1147,7 @@ impl LogServer {
11281147
let handle = self.open_logs.get_or_create_state(key);
11291148
let mark_dirty = MarkDirty {
11301149
collection_id,
1131-
dirty_log: Arc::clone(&self.dirty_log),
1150+
dirty_log: self.dirty_log.clone(),
11321151
};
11331152
let log = match get_log_from_handle(
11341153
&handle,
@@ -1480,7 +1499,7 @@ impl LogServer {
14801499
if offset != max_offset {
14811500
let mark_dirty = MarkDirty {
14821501
collection_id: target_collection_id,
1483-
dirty_log: Arc::clone(&self.dirty_log),
1502+
dirty_log: self.dirty_log.clone(),
14841503
};
14851504
let _ = mark_dirty
14861505
.mark_dirty(offset, (max_offset - offset) as usize)
@@ -1562,22 +1581,30 @@ impl LogServer {
15621581
})
15631582
.collect::<Result<_, _>>()
15641583
.map_err(|err| Status::internal(format!("Failed to serialize dirty marker: {err}")))?;
1565-
self.dirty_log
1566-
.append_many(dirty_marker_json_blobs)
1567-
.await
1568-
.map_err(|err| Status::new(err.code().into(), err.to_string()))?;
1569-
Ok(Response::new(PurgeDirtyForCollectionResponse {}))
1584+
if let Some(dirty_log) = self.dirty_log.as_ref() {
1585+
dirty_log
1586+
.append_many(dirty_marker_json_blobs)
1587+
.await
1588+
.map_err(|err| Status::new(err.code().into(), err.to_string()))?;
1589+
Ok(Response::new(PurgeDirtyForCollectionResponse {}))
1590+
} else {
1591+
tracing::error!("dirty log not set and purge dirty received");
1592+
Err(Status::failed_precondition("dirty log not configured"))
1593+
}
15701594
}
15711595

15721596
#[tracing::instrument(skip(self, _request))]
15731597
async fn inspect_dirty_log(
15741598
&self,
15751599
_request: Request<InspectDirtyLogRequest>,
15761600
) -> Result<Response<InspectDirtyLogResponse>, Status> {
1577-
let Some(reader) = self.dirty_log.reader(LogReaderOptions::default()) else {
1601+
let Some(dirty_log) = self.dirty_log.as_ref() else {
1602+
return Err(Status::unavailable("dirty log not configured"));
1603+
};
1604+
let Some(reader) = dirty_log.reader(LogReaderOptions::default()) else {
15781605
return Err(Status::unavailable("Failed to get dirty log reader"));
15791606
};
1580-
let Some(cursors) = self.dirty_log.cursors(CursorStoreOptions::default()) else {
1607+
let Some(cursors) = dirty_log.cursors(CursorStoreOptions::default()) else {
15811608
return Err(Status::unavailable("Failed to get dirty log cursors"));
15821609
};
15831610
let witness = match cursors.load(&STABLE_PREFIX).await {
@@ -1774,7 +1801,7 @@ impl LogServer {
17741801
let key = LogKey { collection_id };
17751802
let mark_dirty = MarkDirty {
17761803
collection_id,
1777-
dirty_log: Arc::clone(&self.dirty_log),
1804+
dirty_log: self.dirty_log.clone(),
17781805
};
17791806
let handle = self.open_logs.get_or_create_state(key);
17801807
let log = get_log_from_handle(
@@ -1798,10 +1825,17 @@ impl LogServer {
17981825
));
17991826
}
18001827
tracing::event!(Level::INFO, host =? host);
1801-
self.dirty_log
1828+
if let Some(dirty_log) = self.dirty_log.as_ref() {
1829+
dirty_log
18021830
.garbage_collect_phase2_update_manifest(&GarbageCollectionOptions::default())
18031831
.await
18041832
.map_err(|err| Status::unknown(err.to_string()))?;
1833+
} else {
1834+
tracing::error!("Could not garbage collect dirty log.");
1835+
return Err(Status::failed_precondition(
1836+
"no dirty log configured for garbage collection".to_string(),
1837+
));
1838+
}
18051839
Ok(Response::new(GarbageCollectPhase2Response {}))
18061840
}
18071841
None => Err(Status::not_found("log not found because it's null")),
@@ -2327,7 +2361,7 @@ impl Configurable<LogServerConfig> for LogServer {
23272361
)
23282362
.await
23292363
.map_err(|err| -> Box<dyn ChromaError> { Box::new(err) as _ })?;
2330-
let dirty_log = Arc::new(dirty_log);
2364+
let dirty_log = Some(Arc::new(dirty_log));
23312365
let rolling_up = tokio::sync::Mutex::new(());
23322366
let metrics = Metrics::new(opentelemetry::global::meter("chroma"));
23332367
let backpressure = Mutex::new(Arc::new(HashSet::default()));
@@ -3529,7 +3563,7 @@ mod tests {
35293563
},
35303564
..Default::default()
35313565
};
3532-
let dirty_log = Arc::new(
3566+
let dirty_log = Some(Arc::new(
35333567
LogWriter::open_or_initialize(
35343568
writer_options.clone(),
35353569
storage.clone(),
@@ -3539,7 +3573,7 @@ mod tests {
35393573
)
35403574
.await
35413575
.expect("Dirty log should be initializable"),
3542-
);
3576+
));
35433577
let config = LogServerConfig {
35443578
writer: writer_options,
35453579
..Default::default()
@@ -4089,7 +4123,7 @@ mod tests {
40894123
)
40904124
.await
40914125
.expect("Failed to create dirty log");
4092-
let dirty_log = Arc::new(dirty_log);
4126+
let dirty_log = Some(Arc::new(dirty_log));
40934127

40944128
// Create LogServer manually
40954129
let config = LogServerConfig::default();

0 commit comments

Comments
 (0)