Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ __pychace__/
*.py[cod]
*$py.class
*.so
*.dylib
*.dll

dist/
sdist/
Expand All @@ -17,8 +19,8 @@ eggs/
.venv/
venv*/
.vscode/
capnproto-c++*/

dask-worker-space/*
.pre-commit-config.yaml

.DS_Store
Expand Down
4 changes: 1 addition & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_SCAN_FOR_MODULES OFF)
set(CMAKE_VERBOSE_MAKEFILE OFF)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(BUILD_SHARED_LIBS ON CACHE BOOL "Build shared libraries")

if(NOT SKBUILD_PROJECT_NAME)
Expand Down Expand Up @@ -63,13 +63,11 @@ elseif(APPLE)
message(STATUS "macOS on ARM64")

add_compile_options(-Wall -Wextra)
add_link_options(-Wl,-rpath,@loader_path)
else()
# Mac Intel
message(STATUS "macOS on x86_64")

add_compile_options(-Wall -Wextra)
add_link_options(-Wl,-rpath,@loader_path)
endif()

elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux")
Expand Down
17 changes: 14 additions & 3 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"buildPresets": [
{
"name": "linux-x86_64",
"displayName": "Linux 64 bit Intel build",
"displayName": "Linux 64 bit build",
"configurePreset": "linux-x86_64",
"condition": {
"type": "equals",
Expand Down Expand Up @@ -74,8 +74,7 @@
"rhs": "Darwin"
},
"targets": [
"object_storage_server",
"cc_object_storage_server",
"py_object_storage_server",
"test_object_manager",
"test_object_storage_server",
"test_logging"
Expand Down Expand Up @@ -116,6 +115,18 @@
"lhs": "${hostSystemName}",
"rhs": "Linux"
}
},
{
"name": "darwin-arm64",
"configurePreset": "darwin-arm64",
"output": {
"outputOnFailure": true
},
"condition": {
"type": "equals",
"lhs": "${hostSystemName}",
"rhs": "Darwin"
}
}
]
}
6 changes: 1 addition & 5 deletions scaler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,4 @@ else()
message(WARNING "Not building OSS, as it's not supported on this system currently!")
endif()

if(LINUX OR WIN32)
add_subdirectory(io/ymq)
else()
message(WARNING "Not building YMQ, as it's not supported on this system currently!")
endif()
add_subdirectory(io/ymq)
9 changes: 5 additions & 4 deletions scaler/io/ymq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ add_library(ymq_objs OBJECT
iocp_context.h
iocp_context.cpp

kqueue_context.h
kqueue_context.cpp

event_loop.h

event_loop_thread.h
event_loop_thread.cpp

event_manager.h
# file_descriptor.h

message_connection.h
message_connection_tcp.h
Expand Down Expand Up @@ -56,7 +58,7 @@ add_library(ymq_objs OBJECT

set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/scaler/io/ymq)

if(LINUX)
if(LINUX OR APPLE)
# ymq python =======================================================================================================

add_library(py_ymq SHARED
Expand All @@ -72,8 +74,7 @@ if(LINUX)

# target_include_directories(py_ymq PRIVATE ${Python3_INCLUDE_DIRS})
target_link_libraries(py_ymq PRIVATE
$<$<CXX_COMPILER_ID:GNU,Clang>:-Wl,-Bstatic> ymq_objs
$<$<CXX_COMPILER_ID:GNU,Clang>:-Wl,-Bdynamic>
ymq_objs
Python3::Module
)

Expand Down
5 changes: 4 additions & 1 deletion scaler/io/ymq/configuration.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

// C++
#include <expected>
#include <functional>
#include <memory>
Expand All @@ -21,6 +20,7 @@ namespace ymq {

class EpollContext;
class IocpContext;
class KqueueContext;
class Message;
class IOSocket;

Expand All @@ -44,6 +44,9 @@ struct Configuration {
#ifdef _WIN32
using PollingContext = IocpContext;
#endif // _WIN32
#ifdef __APPLE__
using PollingContext = KqueueContext;
#endif // __APPLE__

using IOSocketIdentity = std::string;
using SendMessageCallback = MoveOnlyFunction<void(std::expected<void, Error>)>;
Expand Down
3 changes: 1 addition & 2 deletions scaler/io/ymq/epoll_context.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#ifdef __linux__

#include "scaler/io/ymq/epoll_context.h"

#include <sys/epoll.h>

#include <cerrno>
#include <functional>

#include "scaler/io/ymq/epoll_context.h"
#include "scaler/io/ymq/error.h"
#include "scaler/io/ymq/event_manager.h"

Expand Down
3 changes: 0 additions & 3 deletions scaler/io/ymq/epoll_context.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#pragma once
#ifdef __linux__

// System
#include <sys/epoll.h>

// C++
#include <functional>
#include <queue>

#include "scaler/io/ymq/configuration.h"
#include "scaler/io/ymq/timed_queue.h"

// First-party
#include "scaler/io/ymq/interruptive_concurrent_queue.h"
#include "scaler/io/ymq/timestamp.h"

Expand Down
12 changes: 8 additions & 4 deletions scaler/io/ymq/event_loop.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#pragma once

// C++
#include <concepts>
#include <cstdint> // uint64_t
#include <functional>
#include <utility> // std::move

// First-party
#include "scaler/io/ymq/configuration.h"

#if defined(__linux__)
#include "scaler/io/ymq/epoll_context.h"
#elif defined(_WIN32)
#include "scaler/io/ymq/iocp_context.h"
#elif defined(__APPLE__)
#include "scaler/io/ymq/kqueue_context.h"
#endif

namespace scaler {
namespace ymq {
Expand All @@ -27,7 +31,7 @@ concept EventLoopBackend = requires(Backend backend, Backend::Function f) {
{ backend.removeFdFromLoop(int {}) } -> std::same_as<void>;
};

template <EventLoopBackend Backend = EpollContext>
template <EventLoopBackend Backend>
class EventLoop {
Backend backend;

Expand Down
4 changes: 1 addition & 3 deletions scaler/io/ymq/event_loop_thread.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@

#include "scaler/io/ymq/event_loop_thread.h"

#include <cassert>
#include <memory>

#include "scaler/io/ymq/error.h"
#include "scaler/io/ymq/event_loop_thread.h"
#include "scaler/io/ymq/event_manager.h"
#include "scaler/io/ymq/io_socket.h"

Expand Down
2 changes: 0 additions & 2 deletions scaler/io/ymq/event_loop_thread.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <thread>
Expand Down
38 changes: 30 additions & 8 deletions scaler/io/ymq/event_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,35 @@
#ifdef _WIN32
#include <windows.h>
#endif // _WIN32
#ifdef __APPLE__
#include <sys/event.h>
#endif // __APPLE__

#include <concepts>
#include <cstdint> // uint64_t
#include <functional>
#include <memory>

// First-party
#include "scaler/io/ymq/configuration.h"

namespace scaler {
namespace ymq {

#ifdef __APPLE__
constexpr uint64_t KQUEUE_EVENT_READ = 1ULL << 0;
constexpr uint64_t KQUEUE_EVENT_WRITE = 1ULL << 1;
constexpr uint64_t KQUEUE_EVENT_CLOSE = 1ULL << 2;
constexpr uint64_t KQUEUE_EVENT_ERROR = 1ULL << 3;
#endif // __APPLE__

class EventLoopThread;

// TODO: Add the _fd back
#ifdef _WIN32
#if defined(_WIN32)
class EventManager: public OVERLAPPED {
#endif // _WIN32
#ifdef __linux__
class EventManager {
#endif // __linux
// FileDescriptor _fd;

#elif defined(__linux__) || defined(__APPLE__)
class EventManager {
#endif
public:
void onEvents(uint64_t events)
{
Expand Down Expand Up @@ -59,6 +65,22 @@ class EventManager: public OVERLAPPED {
}
}
#endif // _WIN32
#ifdef __APPLE__
if constexpr (std::same_as<Configuration::PollingContext, KqueueContext>) {
if (events & KQUEUE_EVENT_CLOSE) {
onClose();
}
if (events & KQUEUE_EVENT_ERROR) {
onError();
}
if (events & KQUEUE_EVENT_READ) {
onRead();
}
if (events & KQUEUE_EVENT_WRITE) {
onWrite();
}
}
#endif // __APPLE__
}

// User that registered them should have everything they need
Expand Down
47 changes: 38 additions & 9 deletions scaler/io/ymq/interruptive_concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
#include <winsock2.h>
// clang-format on
#endif // _WIN32
#ifdef __APPLE__
#include <sys/event.h>
#include <cerrno>
#include <cstring>
#endif // __APPLE__

// C++
#include <cstdlib>
#include <vector>

#include "scaler/io/ymq/error.h"
Expand All @@ -26,9 +29,6 @@ class EventManager;
#ifdef __linux__
template <typename T>
class InterruptiveConcurrentQueue {
int _eventFd;
moodycamel::ConcurrentQueue<T> _queue;

public:
InterruptiveConcurrentQueue(): _queue()
{
Expand Down Expand Up @@ -116,16 +116,16 @@ class InterruptiveConcurrentQueue {
InterruptiveConcurrentQueue& operator=(InterruptiveConcurrentQueue&&) = delete;

~InterruptiveConcurrentQueue() { close(_eventFd); }

private:
int _eventFd;
moodycamel::ConcurrentQueue<T> _queue;
};
#endif // __linux__

#ifdef _WIN32
template <typename T>
class InterruptiveConcurrentQueue {
HANDLE _completionPort;
const size_t _key;
moodycamel::ConcurrentQueue<T> _queue;

public:
InterruptiveConcurrentQueue(HANDLE completionPort, size_t key): _queue(), _completionPort(completionPort), _key(key)
{
Expand Down Expand Up @@ -161,9 +161,38 @@ class InterruptiveConcurrentQueue {
InterruptiveConcurrentQueue& operator=(const InterruptiveConcurrentQueue&) = delete;
InterruptiveConcurrentQueue(InterruptiveConcurrentQueue&&) = delete;
InterruptiveConcurrentQueue& operator=(InterruptiveConcurrentQueue&&) = delete;

private:
HANDLE _completionPort;
const size_t _key;
moodycamel::ConcurrentQueue<T> _queue;
};

#endif // _WIN32

#ifdef __APPLE__
template <typename T>
class InterruptiveConcurrentQueue {
public:
InterruptiveConcurrentQueue(int kqfd, uintptr_t ident): _kqfd(kqfd), _ident(ident) {}

uintptr_t ident() const { return _ident; }

void enqueue(T item);
std::vector<T> dequeue();

// unmovable, uncopyable
InterruptiveConcurrentQueue(const InterruptiveConcurrentQueue&) = delete;
InterruptiveConcurrentQueue& operator=(const InterruptiveConcurrentQueue&) = delete;
InterruptiveConcurrentQueue(InterruptiveConcurrentQueue&&) = delete;
InterruptiveConcurrentQueue& operator=(InterruptiveConcurrentQueue&&) = delete;

private:
int _kqfd; // shared kqueue descriptor
uintptr_t _ident; // EVFILT_USER ident reserved for this queue
moodycamel::ConcurrentQueue<T> _queue;
};
#endif // __APPLE__

} // namespace ymq
} // namespace scaler
Loading
Loading