Skip to content

Commit

Permalink
Use getData/setData in even more places
Browse files Browse the repository at this point in the history
  • Loading branch information
dermesser committed Aug 4, 2024
1 parent 7d8d7dc commit 3d85b11
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 40 deletions.
3 changes: 2 additions & 1 deletion uvco/close.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <uv.h>

#include "uvco/close.h"
#include "uvco/internal/internal_utils.h"
#include "uvco/loop/loop.h"
#include "uvco/run.h"

Expand All @@ -20,7 +21,7 @@ bool CloseAwaiter::await_ready() const { return closed_; }
void CloseAwaiter::await_resume() {}

void onCloseCallback(uv_handle_t *stream) {
auto *awaiter = (CloseAwaiter *)stream->data;
auto *awaiter = getData<CloseAwaiter>(stream);
awaiter->closed_ = true;
if (awaiter->handle_) {
auto handle = *awaiter->handle_;
Expand Down
4 changes: 2 additions & 2 deletions uvco/close.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ template <typename T, typename C>
Promise<void> closeHandle(T *handle, C closer) {
BOOST_ASSERT(handle != nullptr);
CloseAwaiter awaiter{};
handle->data = &awaiter;
setData(handle, &awaiter);
closer(handle, onCloseCallback);
co_await awaiter;
handle->data = nullptr;
setData(handle, (void *)nullptr);
BOOST_ASSERT(awaiter.closed_);
}

Expand Down
3 changes: 3 additions & 0 deletions uvco/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ class Directory {
uv_dirent_type_t type;
};

/// Create a directory.
static Promise<void> mkdir(const Loop &loop, std::string_view path,
int mode = 0755);
/// Remove a directory. It must be empty.
static Promise<void> rmdir(const Loop &loop, std::string_view path);
/// Open a directory for reading.
static Promise<Directory> open(const Loop &loop, std::string_view path);
/// Read all directory entries of the given directory.
static MultiPromise<DirEnt> readAll(const Loop &loop, std::string_view path);
Expand Down
2 changes: 1 addition & 1 deletion uvco/name_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Promise<AddressHandle> Resolver::gai(std::string_view host,

void Resolver::onAddrinfo(uv_getaddrinfo_t *req, uv_status status,
struct addrinfo *result) {
auto *awaiter = (AddrinfoAwaiter_ *)req->data;
auto *awaiter = getRequestData<AddrinfoAwaiter_>(req);
awaiter->addrinfo_ = result;
awaiter->status_ = status;
BOOST_ASSERT(awaiter->handle_);
Expand Down
12 changes: 6 additions & 6 deletions uvco/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ size_t StreamBase::InStreamAwaiter_::await_resume() {
void StreamBase::InStreamAwaiter_::allocate(uv_handle_t *handle,
size_t /*suggested_size*/,
uv_buf_t *buf) {
const InStreamAwaiter_ *awaiter = (InStreamAwaiter_ *)handle->data;
const InStreamAwaiter_ *awaiter = getData<InStreamAwaiter_>(handle);
BOOST_ASSERT(awaiter != nullptr);
buf->base = awaiter->buffer_.data();
buf->len = awaiter->buffer_.size();
Expand All @@ -162,7 +162,7 @@ void StreamBase::InStreamAwaiter_::stop_read() {
void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream,
ssize_t nread,
const uv_buf_t * /*buf*/) {
auto *awaiter = (InStreamAwaiter_ *)stream->data;
auto *awaiter = getData<InStreamAwaiter_>(stream);
BOOST_ASSERT(awaiter != nullptr);
awaiter->stop_read();
awaiter->status_ = nread;
Expand All @@ -172,7 +172,7 @@ void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream,
awaiter->handle_.reset();
Loop::enqueue(handle);
}
stream->data = nullptr;
setData(stream, (void*)nullptr);
}

StreamBase::OutStreamAwaiter_::OutStreamAwaiter_(StreamBase &stream,
Expand Down Expand Up @@ -223,14 +223,14 @@ uv_status StreamBase::OutStreamAwaiter_::await_resume() {

void StreamBase::OutStreamAwaiter_::onOutStreamWrite(uv_write_t *write,
uv_status status) {
auto *awaiter = (OutStreamAwaiter_ *)write->data;
auto *awaiter = getRequestData<OutStreamAwaiter_>(write);
BOOST_ASSERT(awaiter != nullptr);
awaiter->status_ = status;
BOOST_ASSERT(awaiter->handle_);
auto handle = awaiter->handle_.value();
awaiter->handle_.reset();
Loop::enqueue(handle);
write->data = nullptr;
setData(write, (void*) nullptr);
}

bool StreamBase::ShutdownAwaiter_::await_ready() { return false; }
Expand All @@ -251,7 +251,7 @@ void StreamBase::ShutdownAwaiter_::await_resume() {

void StreamBase::ShutdownAwaiter_::onShutdown(uv_shutdown_t *req,
uv_status status) {
auto *awaiter = (ShutdownAwaiter_ *)req->data;
auto *awaiter = getRequestData<ShutdownAwaiter_>(req);
awaiter->status_ = status;
if (awaiter->handle_) {
auto handle = awaiter->handle_.value();
Expand Down
32 changes: 17 additions & 15 deletions uvco/stream_server_base_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ StreamServerBase<UvStreamType, StreamType>::~StreamServerBase() {

template <typename UvStreamType, typename StreamType>
Promise<void> StreamServerBase<UvStreamType, StreamType>::close() {
auto *awaiter = (ConnectionAwaiter_ *)socket_->data;
// Resume listener coroutine and tell it to exit.
// If awaiter == nullptr, one of two things is true:
// 1. listener is currently not running
// 2. listener has yielded and is suspended there: the listener generator will
// be cancelled when its MultiPromise is dropped.
if (awaiter != nullptr && awaiter->handle_) {
awaiter->stop();
if (!dataIsNull(socket_.get())) {
auto *awaiter = getData<ConnectionAwaiter_>(socket_.get());
// Resume listener coroutine and tell it to exit.
// If awaiter == nullptr, one of two things is true:
// 1. listener is currently not running
// 2. listener has yielded and is suspended there: the listener generator
// will be cancelled when its MultiPromise is dropped.
if (awaiter->handle_) {
awaiter->stop();
}
}
co_await closeHandle(socket_.get());
socket_.reset();
Expand All @@ -59,13 +61,13 @@ MultiPromise<StreamType>
StreamServerBase<UvStreamType, StreamType>::listen(int backlog) {
BOOST_ASSERT(socket_);
ConnectionAwaiter_ awaiter{*socket_};
socket_->data = &awaiter;
setData(socket_.get(), &awaiter);

const uv_status listenStatus =
uv_listen((uv_stream_t *)socket_.get(), backlog,
StreamServerBase<UvStreamType, StreamType>::onNewConnection);
if (listenStatus != 0) {
socket_->data = nullptr;
setData(socket_.get(), (void *)nullptr);
throw UvcoException{listenStatus,
"StreamServerBase::listen(): failed to listen"};
}
Expand All @@ -90,7 +92,7 @@ StreamServerBase<UvStreamType, StreamType>::listen(int backlog) {
// and will process the remaining connections. Therefore, first remove
// the already processed connections.
awaiter.accepted_.erase(awaiter.accepted_.begin(), it);
socket_->data = nullptr;
setData(socket_.get(), (void *)nullptr);
throw UvcoException{status,
"UnixStreamServer failed to accept a connection!"};
} else {
Expand All @@ -100,14 +102,14 @@ StreamServerBase<UvStreamType, StreamType>::listen(int backlog) {
//
// `close()` also relies on whether `socket_->data` is `nullptr` or not
// to decide if the socket has been closed already.
socket_->data = nullptr;
setData(socket_.get(), (void *)nullptr);
co_yield std::move(std::get<1>(streamSlot));
socket_->data = &awaiter;
setData(socket_.get(), &awaiter);
}
}
awaiter.accepted_.clear();
}
socket_->data = nullptr;
setData(socket_.get(), (void *)nullptr);
}

template <typename UvStreamType, typename StreamType>
Expand Down Expand Up @@ -143,7 +145,7 @@ template <typename UvStreamType, typename StreamType>
void StreamServerBase<UvStreamType, StreamType>::onNewConnection(
uv_stream_t *stream, uv_status status) {
const auto *server = (UvStreamType *)stream;
auto *connectionAwaiter = (ConnectionAwaiter_ *)server->data;
auto *connectionAwaiter = getData<ConnectionAwaiter_>(server);
uv_loop_t *const loop = connectionAwaiter->socket_.loop;

if (status == 0) {
Expand Down
2 changes: 1 addition & 1 deletion uvco/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Promise<TcpStream> TcpClient::connect() {
}

void TcpClient::onConnect(uv_connect_t *req, uv_status status) {
auto *connect = static_cast<ConnectAwaiter_ *>(req->data);
auto *connect = getRequestData<ConnectAwaiter_>(req);
connect->onConnect(status);
}

Expand Down
10 changes: 5 additions & 5 deletions uvco/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ class TimerAwaiter {
TimerAwaiter(TimerAwaiter &&other) noexcept
: timer_{std::move(other.timer_)}, handle_{other.handle_},
stopped_{other.stopped_} {
timer_->data = this;
setData(timer_.get(), this);
other.closed_ = true;
}
TimerAwaiter &operator=(const TimerAwaiter &) = delete;
TimerAwaiter &operator=(TimerAwaiter &&other) noexcept {
timer_ = std::move(other.timer_);
handle_ = other.handle_;
stopped_ = other.stopped_;
timer_->data = this;
setData(timer_.get(), this);
other.closed_ = true;
return *this;
}
TimerAwaiter(const Loop &loop, uint64_t millis, bool repeating = false)
: timer_{std::make_unique<uv_timer_t>()} {
uv_timer_init(loop.uvloop(), timer_.get());
timer_->data = this;
setData(timer_.get(), this);
if (repeating) {
uv_timer_start(timer_.get(), onMultiTimerFired, millis, millis);
} else {
Expand Down Expand Up @@ -110,13 +110,13 @@ class TimerAwaiter {
};

void onSingleTimerDone(uv_timer_t *handle) {
auto *awaiter = (TimerAwaiter *)handle->data;
auto *awaiter = getData<TimerAwaiter>(handle);
awaiter->stop();
awaiter->resume();
}

void onMultiTimerFired(uv_timer_t *handle) {
auto *awaiter = (TimerAwaiter *)handle->data;
auto *awaiter = getData<TimerAwaiter>(handle);
awaiter->resume();
}

Expand Down
14 changes: 7 additions & 7 deletions uvco/udp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ MultiPromise<std::pair<std::string, AddressHandle>> Udp::receiveMany() {

Promise<void> Udp::close() {
BOOST_ASSERT(udp_);
RecvAwaiter_ *const awaiter = (RecvAwaiter_ *)udp_->data;
if (awaiter != nullptr) {
if (!dataIsNull(udp_.get())) {
auto *const awaiter = getData<RecvAwaiter_>(udp_.get());
fmt::print(stderr, "Udp::close(): stopping receiving. Please instead use "
"Udp::stopReceivingMany() explicitly.\n");
// Force return from receiveMany() generator.
Expand All @@ -208,10 +208,10 @@ void Udp::stopReceiveMany(
udpStopReceive();
// Cancel receiving generator if currently suspended by co_yield.
packets.cancel();
auto *const currentAwaiter = (RecvAwaiter_ *)udp_->data;
if (currentAwaiter == nullptr) {
if (dataIsNull(udp_.get())) {
return;
}
auto *const currentAwaiter = getData<RecvAwaiter_>(udp_.get());
// If generator is suspended on co_await, resume it synchronously so it can
// exit before the Udp instance is possibly destroyed.
if (currentAwaiter->handle_) {
Expand All @@ -236,8 +236,8 @@ int Udp::udpStartReceive() {
void Udp::onReceiveOne(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned int flags) {

auto *awaiter = (RecvAwaiter_ *)handle->data;
BOOST_ASSERT(awaiter != nullptr);
BOOST_ASSERT(!dataIsNull(handle));
auto *awaiter = getData<RecvAwaiter_>(handle);

if (addr == nullptr) {
// Error or asking to free buffers.
Expand Down Expand Up @@ -379,7 +379,7 @@ Udp::RecvAwaiter_::await_resume() {
}

void Udp::onSendDone(uv_udp_send_t *req, uv_status status) {
auto *const awaiter = (SendAwaiter_ *)req->data;
auto *const awaiter = getRequestData<SendAwaiter_>(req);
awaiter->status_ = status;
if (awaiter->handle_) {
auto resumeHandle = *awaiter->handle_;
Expand Down
3 changes: 2 additions & 1 deletion uvco/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Udp {
MultiPromise<std::pair<std::string, AddressHandle>> receiveMany();

/// Stop receiving with `receiveMany()` by cancelling the receiving generator
/// coroutine.
/// coroutine. Supply the MultiPromise obtained from receiveMany() in order to
/// guarantee a complete clean-up.
void
stopReceiveMany(MultiPromise<std::pair<std::string, AddressHandle>> &packets);

Expand Down
2 changes: 1 addition & 1 deletion uvco/uds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ UnixStreamClient::ConnectAwaiter_::ConnectAwaiter_(const Loop &loop,

void UnixStreamClient::ConnectAwaiter_::onConnect(uv_connect_t *req,
uv_status status) {
auto *awaiter = (ConnectAwaiter_ *)req->data;
auto *awaiter = getRequestData<ConnectAwaiter_>(req);
awaiter->status_ = status;
if (awaiter->handle_) {
Loop::enqueue(awaiter->handle_.value());
Expand Down

0 comments on commit 3d85b11

Please sign in to comment.