Skip to content

Commit

Permalink
[src] Add specified buffer size reading API for IoBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Sigma711 committed Aug 25, 2023
1 parent 572a0fa commit 4d397bf
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/io_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ ssize_t IoBuffer::ReadFromFd(int fd, int* tmp_errno) {
}
return n;
}
ssize_t IoBuffer::ReadFromFd(int fd, size_t read_len, int* tmp_errno) {
EnsureWritableSpace(read_len);
ssize_t res = 0;
while (read_len > 0) {
auto bytes_read =
::recv(fd, static_cast<void*>(const_cast<char*>(GetWritablePosition())),
read_len, MSG_NOSIGNAL);
read_len -= bytes_read;
writing_index_ += bytes_read;
res += bytes_read;
}
return res;
}

ssize_t IoBuffer::WriteToFd(int fd) {
ssize_t n = ::send(
Expand Down
1 change: 1 addition & 0 deletions src/io_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class IoBuffer {
// Retrieve content from the file descriptor with discrete reading(coping with
// sudden large traffic) to the buffer
ssize_t ReadFromFd(int fd, int* tmp_errno);
ssize_t ReadFromFd(int fd, size_t read_len, int* tmp_errno);

// Stuff content of the buffer into the file descriptor
ssize_t WriteToFd(int fd);
Expand Down
9 changes: 5 additions & 4 deletions src/rpc_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void RpcCodec::OnMessage(int sock_fd, IoBuffer* io_buffer,
const ssize_t min_header_len =
static_cast<ssize_t>(kMinMessageLength + kHeaderLength);
while (min_header_len >= io_buffer->GetReadableBytes()) {
io_buffer->ReadFromFd(sock_fd, &saved_errno);
io_buffer->ReadFromFd(sock_fd, min_header_len, &saved_errno);
if (saved_errno != 0) {
LOG_ERROR("RpcCodec::OnMessage() - Fd(%d) with errno(%d)", sock_fd,
saved_errno);
Expand All @@ -135,9 +135,10 @@ void RpcCodec::OnMessage(int sock_fd, IoBuffer* io_buffer,
ErrorCode::kInvalidLength);
return;
}
while (io_buffer->GetReadableBytes() <=
static_cast<size_t>(kHeaderLength + len)) {
io_buffer->ReadFromFd(sock_fd, &saved_errno);
while (static_cast<size_t>(kHeaderLength + len) >=
io_buffer->GetReadableBytes()) {
io_buffer->ReadFromFd(sock_fd, static_cast<size_t>(kHeaderLength + len),
&saved_errno);
}
if (AsyncRawCallback_ &&
!SyncRawCallback_(sock_fd,
Expand Down

0 comments on commit 4d397bf

Please sign in to comment.