Skip to content

Commit

Permalink
refactor(raw): Merge all operations into one enum (#4977)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 7, 2024
1 parent ed50911 commit 9298df7
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 202 deletions.
13 changes: 5 additions & 8 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
use await_tree::InstrumentAwait;
use futures::Future;
use futures::FutureExt;
use oio::ListOperation;
use oio::ReadOperation;
use oio::WriteOperation;

use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -182,7 +179,7 @@ impl<R: oio::Read> oio::Read for AwaitTreeWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
self.inner
.read()
.instrument_await(format!("opendal::{}", ReadOperation::Read))
.instrument_await(format!("opendal::{}", Operation::ReaderRead))
.await
}
}
Expand All @@ -197,19 +194,19 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}", WriteOperation::Write.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterWrite.into_static()))
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.abort()
.instrument_await(format!("opendal::{}", WriteOperation::Abort.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterAbort.into_static()))
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.close()
.instrument_await(format!("opendal::{}", WriteOperation::Close.into_static()))
.instrument_await(format!("opendal::{}", Operation::WriterClose.into_static()))
}
}

Expand All @@ -227,7 +224,7 @@ impl<R: oio::List> oio::List for AwaitTreeWrapper<R> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner
.next()
.instrument_await(format!("opendal::{}", ListOperation::Next))
.instrument_await(format!("opendal::{}", Operation::ListerNext))
.await
}
}
Expand Down
21 changes: 9 additions & 12 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ use std::sync::Arc;

use futures::TryFutureExt;

use crate::raw::oio::ListOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -358,7 +355,7 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::Read)
err.with_operation(Operation::ReaderRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
Expand All @@ -376,7 +373,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
err.with_operation(Operation::BlockingReaderRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
Expand All @@ -395,7 +392,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
err.with_operation(Operation::WriterWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
Expand All @@ -405,7 +402,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
err.with_operation(Operation::WriterClose)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
Expand All @@ -414,7 +411,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
err.with_operation(WriteOperation::Abort)
err.with_operation(Operation::WriterAbort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("processed", self.processed.to_string())
Expand All @@ -431,7 +428,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
err.with_operation(Operation::BlockingWriterWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
Expand All @@ -441,7 +438,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {

fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
err.with_operation(WriteOperation::BlockingClose)
err.with_operation(Operation::BlockingWriterClose)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
Expand All @@ -459,7 +456,7 @@ impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ListOperation::Next)
err.with_operation(Operation::ListerNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
Expand All @@ -476,7 +473,7 @@ impl<T: oio::BlockingList> oio::BlockingList for ErrorContextWrapper<T> {
bs
})
.map_err(|err| {
err.with_operation(ListOperation::BlockingNext)
err.with_operation(Operation::BlockingListerNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
Expand Down
19 changes: 9 additions & 10 deletions core/src/layers/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use std::sync::Arc;
use fastrace::prelude::*;
use futures::FutureExt;

use crate::raw::oio::ListOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -301,41 +298,43 @@ impl<R: oio::Read> oio::Read for FastraceWrapper<R> {
impl<R: oio::BlockingRead> oio::BlockingRead for FastraceWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::BlockingReaderRead.into_static());
self.inner.read()
}
}

impl<R: oio::Write> oio::Write for FastraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterWrite.into_static());
self.inner.write(bs)
}

fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Abort.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterAbort.into_static());
self.inner.abort()
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Close.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::WriterClose.into_static());
self.inner.close()
}
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for FastraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
let _span =
LocalSpan::enter_with_local_parent(Operation::BlockingWriterWrite.into_static());
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static());
let _span =
LocalSpan::enter_with_local_parent(Operation::BlockingWriterClose.into_static());
self.inner.close()
}
}
Expand All @@ -350,7 +349,7 @@ impl<R: oio::List> oio::List for FastraceWrapper<R> {
impl<R: oio::BlockingList> oio::BlockingList for FastraceWrapper<R> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ListOperation::BlockingNext.into_static());
let _span = LocalSpan::enter_with_local_parent(Operation::BlockingListerNext.into_static());
self.inner.next()
}
}
26 changes: 12 additions & 14 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use log::log;
use log::trace;
use log::Level;

use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -988,7 +986,7 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> read returns {}B",
self.ctx.scheme,
ReadOperation::Read,
Operation::ReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
bs.remaining()
Expand All @@ -1002,7 +1000,7 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
lvl,
"service={} operation={} path={} read={} -> read failed: {}",
self.ctx.scheme,
ReadOperation::Read,
Operation::ReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
self.ctx.error_print(&err),
Expand All @@ -1024,7 +1022,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> read returns {}B",
self.ctx.scheme,
ReadOperation::BlockingRead,
Operation::BlockingReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
bs.remaining()
Expand All @@ -1038,7 +1036,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
lvl,
"service={} operation={} path={} read={} -> read failed: {}",
self.ctx.scheme,
ReadOperation::BlockingRead,
Operation::BlockingReaderRead,
self.path,
self.read.load(Ordering::Relaxed),
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1081,7 +1079,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::Write,
Operation::WriterWrite,
self.path,
self.written,
size,
Expand All @@ -1095,7 +1093,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data write failed: {}",
self.ctx.scheme,
WriteOperation::Write,
Operation::WriterWrite,
self.path,
self.written,
self.ctx.error_print(&err),
Expand All @@ -1113,7 +1111,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> abort writer",
self.ctx.scheme,
WriteOperation::Abort,
Operation::WriterAbort,
self.path,
self.written,
);
Expand All @@ -1126,7 +1124,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> abort writer failed: {}",
self.ctx.scheme,
WriteOperation::Abort,
Operation::WriterAbort,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1157,7 +1155,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data close failed: {}",
self.ctx.scheme,
WriteOperation::Close,
Operation::WriterClose,
self.path,
self.written,
self.ctx.error_print(&err),
Expand All @@ -1177,7 +1175,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::BlockingWrite,
Operation::BlockingWriterWrite,
self.path,
self.written,
bs.len(),
Expand All @@ -1191,7 +1189,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data write failed: {}",
self.ctx.scheme,
WriteOperation::BlockingWrite,
Operation::BlockingWriterWrite,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down Expand Up @@ -1222,7 +1220,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
lvl,
"service={} operation={} path={} written={}B -> data close failed: {}",
self.ctx.scheme,
WriteOperation::BlockingClose,
Operation::BlockingWriterClose,
self.path,
self.written,
self.ctx.error_print(&err),
Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use prometheus::register_int_counter_vec_with_registry;
use prometheus::HistogramVec;
use prometheus::Registry;

use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
use crate::raw::Access;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -681,7 +679,7 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
ReadOperation::Read.into_static(),
Operation::ReaderRead.into_static(),
&self.path,
);

Expand Down Expand Up @@ -713,7 +711,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
ReadOperation::BlockingRead.into_static(),
Operation::BlockingReaderRead.into_static(),
&self.path,
);

Expand Down Expand Up @@ -747,7 +745,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Write.into_static(),
Operation::WriterWrite.into_static(),
&self.path,
);

Expand Down Expand Up @@ -777,7 +775,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn abort(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Abort.into_static(),
Operation::WriterAbort.into_static(),
&self.path,
);

Expand All @@ -801,7 +799,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn close(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Close.into_static(),
Operation::WriterClose.into_static(),
&self.path,
);

Expand Down Expand Up @@ -859,7 +857,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn close(&mut self) -> Result<()> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::BlockingClose.into_static(),
Operation::BlockingWriterClose.into_static(),
&self.path,
);

Expand Down
Loading

0 comments on commit 9298df7

Please sign in to comment.