From 3992858c70aaee9da1c3d3def98bc2759f240dc1 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Wed, 29 Nov 2023 16:54:39 +0100 Subject: [PATCH] Merge ReadArchive and WriteArchive. --- src/collector/base.rs | 2 - src/collector/rrdp/archive.rs | 108 +++++++++++++--------------------- src/collector/rrdp/base.rs | 24 ++++---- src/collector/rrdp/update.rs | 10 ++-- 4 files changed, 57 insertions(+), 87 deletions(-) diff --git a/src/collector/base.rs b/src/collector/base.rs index a43af01b..92c7842c 100644 --- a/src/collector/base.rs +++ b/src/collector/base.rs @@ -82,8 +82,6 @@ impl Collector { } /// Sanitizes the stored data. - /// - /// Currently doesn’t do anything. pub fn sanitize(&self) -> Result<(), Fatal> { if let Some(rrdp) = self.rrdp.as_ref() { rrdp.sanitize()?; diff --git a/src/collector/rrdp/archive.rs b/src/collector/rrdp/archive.rs index fec78ace..ae0bcbaa 100644 --- a/src/collector/rrdp/archive.rs +++ b/src/collector/rrdp/archive.rs @@ -17,10 +17,10 @@ use crate::utils::archive::{ use crate::utils::binio::{Compose, Parse}; -//------------ ReadArchive --------------------------------------------------- +//------------ RrdpArchive --------------------------------------------------- #[derive(Debug)] -pub struct ReadArchive { +pub struct RrdpArchive { /// The path where everything from this repository lives. path: Arc, @@ -28,7 +28,37 @@ pub struct ReadArchive { archive: archive::Archive, } -impl ReadArchive { +impl RrdpArchive { + pub fn create( + path: Arc + ) -> Result { + let archive = Archive::create(path.as_ref()).map_err(|err| { + archive_err(err, path.as_ref()) + })?; + Ok(Self { path, archive }) + } + + pub fn create_with_file( + file: fs::File, + path: Arc, + ) -> Result { + let archive = Archive::create_with_file(file).map_err(|err| { + archive_err(err, path.as_ref()) + })?; + Ok(Self { path, archive }) + } + + pub fn try_open(path: Arc) -> Result, RunFailed> { + let archive = match Archive::open(path.as_ref(), true) { + Ok(archive) => archive, + Err(OpenError::NotFound) => return Ok(None), + Err(OpenError::Archive(err)) => { + return Err(archive_err(err, path.as_ref())) + } + }; + Ok(Some(Self { path, archive })) + } + pub fn open(path: Arc) -> Result { let archive = archive::Archive::open( path.as_ref(), false @@ -44,6 +74,12 @@ impl ReadArchive { Ok(Self { path, archive }) } + pub fn path(&self) -> &Arc { + &self.path + } +} + +impl RrdpArchive { pub fn verify(path: &Path) -> Result<(), OpenError> { let archive = archive::Archive::::open(path, false)?; archive.verify()?; @@ -121,53 +157,7 @@ impl ReadArchive { } } - -//------------ WriteArchive -------------------------------------------------- - -#[derive(Debug)] -pub struct WriteArchive { - /// The path where everything from this repository lives. - path: Arc, - - /// The archive for the repository. - archive: Archive, -} - -impl WriteArchive { - pub fn create( - path: Arc - ) -> Result { - let archive = Archive::create(path.as_ref()).map_err(|err| { - archive_err(err, path.as_ref()) - })?; - Ok(Self { path, archive }) - } - - pub fn create_with_file( - file: fs::File, - path: Arc, - ) -> Result { - let archive = Archive::create_with_file(file).map_err(|err| { - archive_err(err, path.as_ref()) - })?; - Ok(Self { path, archive }) - } - - pub fn open(path: Arc) -> Result, RunFailed> { - let archive = match Archive::open(path.as_ref(), true) { - Ok(archive) => archive, - Err(OpenError::NotFound) => return Ok(None), - Err(OpenError::Archive(err)) => { - return Err(archive_err(err, path.as_ref())) - } - }; - Ok(Some(Self { path, archive })) - } - - pub fn path(&self) -> &Arc { - &self.path - } - +impl RrdpArchive { /// Publishes a new object to the archie. pub fn publish_object( &mut self, @@ -220,24 +210,6 @@ impl WriteArchive { )?) } - pub fn load_state(&self) -> Result { - let data = match self.archive.fetch(b"state") { - Ok(data) => data, - Err(archive::FetchError::NotFound) => { - return Err( - archive_err(ArchiveError::Corrupt, self.path.as_ref()) - ) - } - Err(archive::FetchError::Archive(err)) => { - return Err(archive_err(err, self.path.as_ref())) - } - }; - let mut data = data.as_ref(); - RepositoryState::parse(&mut data).map_err(|_| { - archive_err(ArchiveError::Corrupt, self.path.as_ref()) - }) - } - pub fn publish_state( &mut self, state: &RepositoryState ) -> Result<(), RunFailed> { diff --git a/src/collector/rrdp/base.rs b/src/collector/rrdp/base.rs index cc5c158e..1666e8d5 100644 --- a/src/collector/rrdp/base.rs +++ b/src/collector/rrdp/base.rs @@ -18,7 +18,7 @@ use crate::utils::dump::DumpRegistry; use crate::utils::json::JsonBuilder; use crate::utils::sync::{Mutex, RwLock}; use crate::utils::uri::UriExt; -use super::archive::{FallbackTime, ReadArchive, RepositoryState, WriteArchive}; +use super::archive::{FallbackTime, RrdpArchive, RepositoryState}; use super::http::{HttpClient, HttpStatus}; use super::update::{ DeltaUpdate, Notification, SnapshotError, SnapshotReason, SnapshotUpdate @@ -107,7 +107,7 @@ impl Collector { if !entry.is_file() { continue; } - match ReadArchive::verify(entry.path()) { + match RrdpArchive::verify(entry.path()) { Ok(()) | Err(OpenError::NotFound) => { } Err(OpenError::Archive(ArchiveError::Io(err))) => { error!( @@ -183,7 +183,7 @@ impl Collector { registry: &mut DumpRegistry, state_registry: &mut HashMap, ) -> Result<(), RunFailed> { - let archive = ReadArchive::open(repo_path.clone())?; + let archive = RrdpArchive::open(repo_path.clone())?; let state = archive.load_state()?; let target_path = registry.get_repo_path(Some(&state.rpki_notify)); let object_path = target_path.join("rsync"); @@ -564,7 +564,7 @@ impl<'a> Run<'a> { path: Arc, retain: &HashSet ) -> Result { - let archive = ReadArchive::open(path)?; + let archive = RrdpArchive::open(path)?; let state = archive.load_state()?; Ok(retain.contains(&state.rpki_notify)) } @@ -644,13 +644,13 @@ impl LoadResult { #[derive(Debug)] pub struct ReadRepository { /// The archive for the repository. - archive: ReadArchive, + archive: RrdpArchive, } impl ReadRepository { fn new(repository: &Repository) -> Result { Ok(Self { - archive: ReadArchive::open(repository.path.clone())?, + archive: RrdpArchive::open(repository.path.clone())?, }) } @@ -727,7 +727,7 @@ impl<'a> RepositoryUpdate<'a> { fn try_update( mut self ) -> Result<(LoadResult, RrdpRepositoryMetrics), RunFailed> { - let current = match WriteArchive::open(self.path.clone()) { + let current = match RrdpArchive::try_open(self.path.clone()) { Ok(Some(archive)) => { let state = archive.load_state()?; Some((archive, state)) @@ -735,7 +735,7 @@ impl<'a> RepositoryUpdate<'a> { Ok(None) => None, Err(err) => { if err.should_retry() { - // WriteArchive::open should already have deleted the + // RrdpArchive::try_open should already have deleted the // file, so we can happily pretend it never existed. None } @@ -787,7 +787,7 @@ impl<'a> RepositoryUpdate<'a> { /// Returns `Ok(false)` if the update failed. fn update( &mut self, - current: Option<(WriteArchive, RepositoryState)>, + current: Option<(RrdpArchive, RepositoryState)>, ) -> Result { let notify = match Notification::get( &self.collector.http, self.rpki_notify, @@ -824,7 +824,7 @@ impl<'a> RepositoryUpdate<'a> { /// Handle the case of a Not Modified response. fn not_modified( &mut self, - current: Option<(WriteArchive, RepositoryState)>, + current: Option<(RrdpArchive, RepositoryState)>, ) -> Result<(), RunFailed> { info!("RRDP {}: Not modified.", self.rpki_notify); if let Some((mut archive, mut state)) = current { @@ -844,7 +844,7 @@ impl<'a> RepositoryUpdate<'a> { ) -> Result { debug!("RRDP {}: updating from snapshot.", self.rpki_notify); let (file, path) = self.collector.temp_file()?; - let mut archive = WriteArchive::create_with_file(file, path.clone())?; + let mut archive = RrdpArchive::create_with_file(file, path.clone())?; if let Err(err) = SnapshotUpdate::new( self.collector, &mut archive, notify, &mut self.metrics ).try_update() { @@ -897,7 +897,7 @@ impl<'a> RepositoryUpdate<'a> { fn delta_update( &mut self, notify: &Notification, - mut archive: WriteArchive, + mut archive: RrdpArchive, state: RepositoryState, ) -> Result, RunFailed> { let deltas = match self.calc_deltas(notify.content(), &state) { diff --git a/src/collector/rrdp/update.rs b/src/collector/rrdp/update.rs index 96caa114..8b641ae1 100644 --- a/src/collector/rrdp/update.rs +++ b/src/collector/rrdp/update.rs @@ -14,7 +14,7 @@ use uuid::Uuid; use crate::error::{Failed, RunFailed}; use crate::metrics::RrdpRepositoryMetrics; use crate::utils::archive::{ArchiveError, PublishError}; -use super::archive::{AccessError, FallbackTime, RepositoryState, WriteArchive}; +use super::archive::{AccessError, FallbackTime, RepositoryState, RrdpArchive}; use super::base::Collector; use super::http::{HttpClient, HttpResponse, HttpStatus}; @@ -136,7 +136,7 @@ pub struct SnapshotUpdate<'a> { collector: &'a Collector, /// The archive to store the snapshot into. - archive: &'a mut WriteArchive, + archive: &'a mut RrdpArchive, /// The notification file pointing to the snapshot. notify: &'a Notification, @@ -148,7 +148,7 @@ pub struct SnapshotUpdate<'a> { impl<'a> SnapshotUpdate<'a> { pub fn new( collector: &'a Collector, - archive: &'a mut WriteArchive, + archive: &'a mut RrdpArchive, notify: &'a Notification, metrics: &'a mut RrdpRepositoryMetrics, ) -> Self { @@ -260,7 +260,7 @@ pub struct DeltaUpdate<'a> { collector: &'a Collector, /// The archive the repository is stored in. - archive: &'a mut WriteArchive, + archive: &'a mut RrdpArchive, /// The session ID of the RRDP session. session_id: Uuid, @@ -281,7 +281,7 @@ impl<'a> DeltaUpdate<'a> { /// Creates a new delta update. pub fn new( collector: &'a Collector, - archive: &'a mut WriteArchive, + archive: &'a mut RrdpArchive, session_id: Uuid, info: &'a DeltaInfo, metrics: &'a mut RrdpRepositoryMetrics,