Skip to content

Commit

Permalink
Merge ReadArchive and WriteArchive.
Browse files Browse the repository at this point in the history
  • Loading branch information
partim committed Nov 29, 2023
1 parent 8395e96 commit 3992858
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 87 deletions.
2 changes: 0 additions & 2 deletions src/collector/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ impl Collector {
}

/// Sanitizes the stored data.
///
/// Currently doesn’t do anything.
pub fn sanitize(&self) -> Result<(), Fatal> {
if let Some(rrdp) = self.rrdp.as_ref() {
rrdp.sanitize()?;
Expand Down
108 changes: 40 additions & 68 deletions src/collector/rrdp/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,48 @@ use crate::utils::archive::{
use crate::utils::binio::{Compose, Parse};


//------------ ReadArchive ---------------------------------------------------
//------------ RrdpArchive ---------------------------------------------------

#[derive(Debug)]
pub struct ReadArchive {
pub struct RrdpArchive {
/// The path where everything from this repository lives.
path: Arc<PathBuf>,

/// The archive for the repository.
archive: archive::Archive<RrdpObjectMeta>,
}

impl ReadArchive {
impl RrdpArchive {
pub fn create(
path: Arc<PathBuf>
) -> Result<Self, RunFailed> {
let archive = Archive::create(path.as_ref()).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}

pub fn create_with_file(
file: fs::File,
path: Arc<PathBuf>,
) -> Result<Self, RunFailed> {
let archive = Archive::create_with_file(file).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}

pub fn try_open(path: Arc<PathBuf>) -> Result<Option<Self>, RunFailed> {
let archive = match Archive::open(path.as_ref(), true) {
Ok(archive) => archive,
Err(OpenError::NotFound) => return Ok(None),
Err(OpenError::Archive(err)) => {
return Err(archive_err(err, path.as_ref()))
}
};
Ok(Some(Self { path, archive }))
}

pub fn open(path: Arc<PathBuf>) -> Result<Self, RunFailed> {
let archive = archive::Archive::open(
path.as_ref(), false
Expand All @@ -44,6 +74,12 @@ impl ReadArchive {
Ok(Self { path, archive })
}

pub fn path(&self) -> &Arc<PathBuf> {
&self.path
}
}

impl RrdpArchive {
pub fn verify(path: &Path) -> Result<(), OpenError> {
let archive = archive::Archive::<RrdpObjectMeta>::open(path, false)?;
archive.verify()?;
Expand Down Expand Up @@ -121,53 +157,7 @@ impl ReadArchive {
}
}


//------------ WriteArchive --------------------------------------------------

#[derive(Debug)]
pub struct WriteArchive {
/// The path where everything from this repository lives.
path: Arc<PathBuf>,

/// The archive for the repository.
archive: Archive<RrdpObjectMeta>,
}

impl WriteArchive {
pub fn create(
path: Arc<PathBuf>
) -> Result<Self, RunFailed> {
let archive = Archive::create(path.as_ref()).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}

pub fn create_with_file(
file: fs::File,
path: Arc<PathBuf>,
) -> Result<Self, RunFailed> {
let archive = Archive::create_with_file(file).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}

pub fn open(path: Arc<PathBuf>) -> Result<Option<Self>, RunFailed> {
let archive = match Archive::open(path.as_ref(), true) {
Ok(archive) => archive,
Err(OpenError::NotFound) => return Ok(None),
Err(OpenError::Archive(err)) => {
return Err(archive_err(err, path.as_ref()))
}
};
Ok(Some(Self { path, archive }))
}

pub fn path(&self) -> &Arc<PathBuf> {
&self.path
}

impl RrdpArchive {
/// Publishes a new object to the archie.
pub fn publish_object(
&mut self,
Expand Down Expand Up @@ -220,24 +210,6 @@ impl WriteArchive {
)?)
}

pub fn load_state(&self) -> Result<RepositoryState, RunFailed> {
let data = match self.archive.fetch(b"state") {
Ok(data) => data,
Err(archive::FetchError::NotFound) => {
return Err(
archive_err(ArchiveError::Corrupt, self.path.as_ref())
)
}
Err(archive::FetchError::Archive(err)) => {
return Err(archive_err(err, self.path.as_ref()))
}
};
let mut data = data.as_ref();
RepositoryState::parse(&mut data).map_err(|_| {
archive_err(ArchiveError::Corrupt, self.path.as_ref())
})
}

pub fn publish_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
Expand Down
24 changes: 12 additions & 12 deletions src/collector/rrdp/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::utils::dump::DumpRegistry;
use crate::utils::json::JsonBuilder;
use crate::utils::sync::{Mutex, RwLock};
use crate::utils::uri::UriExt;
use super::archive::{FallbackTime, ReadArchive, RepositoryState, WriteArchive};
use super::archive::{FallbackTime, RrdpArchive, RepositoryState};
use super::http::{HttpClient, HttpStatus};
use super::update::{
DeltaUpdate, Notification, SnapshotError, SnapshotReason, SnapshotUpdate
Expand Down Expand Up @@ -107,7 +107,7 @@ impl Collector {
if !entry.is_file() {
continue;
}
match ReadArchive::verify(entry.path()) {
match RrdpArchive::verify(entry.path()) {
Ok(()) | Err(OpenError::NotFound) => { }
Err(OpenError::Archive(ArchiveError::Io(err))) => {
error!(
Expand Down Expand Up @@ -183,7 +183,7 @@ impl Collector {
registry: &mut DumpRegistry,
state_registry: &mut HashMap<uri::Https, RepositoryState>,
) -> Result<(), RunFailed> {
let archive = ReadArchive::open(repo_path.clone())?;
let archive = RrdpArchive::open(repo_path.clone())?;
let state = archive.load_state()?;
let target_path = registry.get_repo_path(Some(&state.rpki_notify));
let object_path = target_path.join("rsync");
Expand Down Expand Up @@ -564,7 +564,7 @@ impl<'a> Run<'a> {
path: Arc<PathBuf>,
retain: &HashSet<uri::Https>
) -> Result<bool, RunFailed> {
let archive = ReadArchive::open(path)?;
let archive = RrdpArchive::open(path)?;
let state = archive.load_state()?;
Ok(retain.contains(&state.rpki_notify))
}
Expand Down Expand Up @@ -644,13 +644,13 @@ impl LoadResult<Repository> {
#[derive(Debug)]
pub struct ReadRepository {
/// The archive for the repository.
archive: ReadArchive,
archive: RrdpArchive,
}

impl ReadRepository {
fn new(repository: &Repository) -> Result<Self, RunFailed> {
Ok(Self {
archive: ReadArchive::open(repository.path.clone())?,
archive: RrdpArchive::open(repository.path.clone())?,
})
}

Expand Down Expand Up @@ -727,15 +727,15 @@ impl<'a> RepositoryUpdate<'a> {
fn try_update(
mut self
) -> Result<(LoadResult<Repository>, RrdpRepositoryMetrics), RunFailed> {
let current = match WriteArchive::open(self.path.clone()) {
let current = match RrdpArchive::try_open(self.path.clone()) {
Ok(Some(archive)) => {
let state = archive.load_state()?;
Some((archive, state))
}
Ok(None) => None,
Err(err) => {
if err.should_retry() {
// WriteArchive::open should already have deleted the
// RrdpArchive::try_open should already have deleted the
// file, so we can happily pretend it never existed.
None
}
Expand Down Expand Up @@ -787,7 +787,7 @@ impl<'a> RepositoryUpdate<'a> {
/// Returns `Ok(false)` if the update failed.
fn update(
&mut self,
current: Option<(WriteArchive, RepositoryState)>,
current: Option<(RrdpArchive, RepositoryState)>,
) -> Result<bool, RunFailed> {
let notify = match Notification::get(
&self.collector.http, self.rpki_notify,
Expand Down Expand Up @@ -824,7 +824,7 @@ impl<'a> RepositoryUpdate<'a> {
/// Handle the case of a Not Modified response.
fn not_modified(
&mut self,
current: Option<(WriteArchive, RepositoryState)>,
current: Option<(RrdpArchive, RepositoryState)>,
) -> Result<(), RunFailed> {
info!("RRDP {}: Not modified.", self.rpki_notify);
if let Some((mut archive, mut state)) = current {
Expand All @@ -844,7 +844,7 @@ impl<'a> RepositoryUpdate<'a> {
) -> Result<bool, RunFailed> {
debug!("RRDP {}: updating from snapshot.", self.rpki_notify);
let (file, path) = self.collector.temp_file()?;
let mut archive = WriteArchive::create_with_file(file, path.clone())?;
let mut archive = RrdpArchive::create_with_file(file, path.clone())?;
if let Err(err) = SnapshotUpdate::new(
self.collector, &mut archive, notify, &mut self.metrics
).try_update() {
Expand Down Expand Up @@ -897,7 +897,7 @@ impl<'a> RepositoryUpdate<'a> {
fn delta_update(
&mut self,
notify: &Notification,
mut archive: WriteArchive,
mut archive: RrdpArchive,
state: RepositoryState,
) -> Result<Option<SnapshotReason>, RunFailed> {
let deltas = match self.calc_deltas(notify.content(), &state) {
Expand Down
10 changes: 5 additions & 5 deletions src/collector/rrdp/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use uuid::Uuid;
use crate::error::{Failed, RunFailed};
use crate::metrics::RrdpRepositoryMetrics;
use crate::utils::archive::{ArchiveError, PublishError};
use super::archive::{AccessError, FallbackTime, RepositoryState, WriteArchive};
use super::archive::{AccessError, FallbackTime, RepositoryState, RrdpArchive};
use super::base::Collector;
use super::http::{HttpClient, HttpResponse, HttpStatus};

Expand Down Expand Up @@ -136,7 +136,7 @@ pub struct SnapshotUpdate<'a> {
collector: &'a Collector,

/// The archive to store the snapshot into.
archive: &'a mut WriteArchive,
archive: &'a mut RrdpArchive,

/// The notification file pointing to the snapshot.
notify: &'a Notification,
Expand All @@ -148,7 +148,7 @@ pub struct SnapshotUpdate<'a> {
impl<'a> SnapshotUpdate<'a> {
pub fn new(
collector: &'a Collector,
archive: &'a mut WriteArchive,
archive: &'a mut RrdpArchive,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
Expand Down Expand Up @@ -260,7 +260,7 @@ pub struct DeltaUpdate<'a> {
collector: &'a Collector,

/// The archive the repository is stored in.
archive: &'a mut WriteArchive,
archive: &'a mut RrdpArchive,

/// The session ID of the RRDP session.
session_id: Uuid,
Expand All @@ -281,7 +281,7 @@ impl<'a> DeltaUpdate<'a> {
/// Creates a new delta update.
pub fn new(
collector: &'a Collector,
archive: &'a mut WriteArchive,
archive: &'a mut RrdpArchive,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
Expand Down

0 comments on commit 3992858

Please sign in to comment.