Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter {
/// Mount an existing TocCatalogue, which has a different metadata key (within
/// constraints) to allow on-line rebadging of data
/// variableKeys: The keys that are allowed to differ between the two DBs
void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) override { NOTIMP; };
void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) override { NOTIMP; };

// // Hide the contents of the DB!!!
// void hideContents() override;
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class CatalogueReader : virtual public Catalogue {
public:

CatalogueReader() {}

virtual ~CatalogueReader() {}

virtual DbStats stats() const = 0;
Expand All @@ -156,7 +156,7 @@ class CatalogueWriter : virtual public Catalogue {
virtual const Index& currentIndex() = 0;
virtual const Key currentIndexKey();
virtual void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr<const FieldLocation> fieldLocation) = 0;
virtual void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) = 0;
virtual void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) = 0;
virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0;
virtual void reconsolidate() = 0;
};
Expand Down
2 changes: 0 additions & 2 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Connection::Connection() : single_(false) { }

void Connection::teardown() {

if (!single_) {
Expand Down
5 changes: 2 additions & 3 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#pragma once

#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/Messages.h"

#include "eckit/exception/Exceptions.h"
Expand Down Expand Up @@ -52,7 +51,7 @@ class Connection : eckit::NonCopyable {
using PayloadList = std::vector<Payload>;

public: // methods
Connection();
Connection() = default;

virtual ~Connection() = default;

Expand Down Expand Up @@ -82,7 +81,7 @@ class Connection : eckit::NonCopyable {
virtual const eckit::net::TCPSocket& dataSocket() const = 0;

protected: // members
bool single_;
bool single_ {false};

private: // members
mutable std::mutex controlMutex_;
Expand Down
18 changes: 18 additions & 0 deletions src/fdb5/remote/Messages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@

#include "fdb5/remote/Messages.h"

#include <cstddef>
#include <ostream>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Payload::Payload(const BufferStream& buffer) : length {buffer.length()}, data {buffer.data()} { }

Payload::Payload(const std::size_t length, const void* data) : length {length}, data {data} { }

bool Payload::empty() const {
return data == nullptr && length == 0;
}

bool Payload::consistent() const {
return ((length == 0) ^ (data == nullptr)) == 0;
}

//----------------------------------------------------------------------------------------------------------------------

std::ostream& operator<<(std::ostream& s, const Message& m) {
switch (m) {
case Message::None: s << "None"; break;
Expand Down Expand Up @@ -46,6 +63,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {
case Message::Store: s << "Store"; break;
case Message::Axes: s << "Axes"; break;
case Message::Exists: s << "Exists"; break;
case Message::Overlay: s << "Overlay"; break;

// Responses
case Message::Received: s << "Received"; break;
Expand Down
44 changes: 34 additions & 10 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,50 @@

#pragma once

#include "eckit/io/Buffer.h"
#include "eckit/serialisation/MemoryStream.h"
#include "eckit/types/FixedString.h"

#include <cmath>
#include <cstddef>
#include <cstdint>

#include "eckit/types/FixedString.h"

namespace eckit {
class Stream;
}
#include <iosfwd>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

struct BufferStream;

struct Payload {
Payload(std::size_t length, const void* data) : length {length}, data {data} { }
Payload() = default;

explicit Payload(const BufferStream& buffer);

Payload(std::size_t length, const void* data);

bool empty() const;

/// @brief Checks if this object is in a consistent state.
/// @returns True if (length & data) is (zero & null) or (non-zero & non-null).
bool consistent() const;

std::size_t length {0};
const void* data {nullptr};
};

struct BufferStream : private eckit::Buffer, public eckit::MemoryStream {
explicit BufferStream(const size_t size) : eckit::Buffer(size), eckit::MemoryStream(data(), size) { }

size_t length() const { return eckit::MemoryStream::position(); }

const void* data() const { return eckit::Buffer::data(); }

Payload payload() const { return {length(), data()}; }
};

//----------------------------------------------------------------------------------------------------------------------

enum class Message : uint16_t {

// Server instructions
Expand Down Expand Up @@ -67,6 +90,7 @@ enum class Message : uint16_t {
Store,
Axes,
Exists,
Overlay,

// Responses
Received = 200,
Expand All @@ -86,11 +110,11 @@ std::ostream& operator<<(std::ostream& s, const Message& m);
class MessageHeader {

public: // types
constexpr static uint16_t currentVersion = 12;
static constexpr uint16_t currentVersion = 12;

constexpr static const auto hashBytes = 16;
static constexpr uint16_t hashBytes = 16;

constexpr static const auto markerBytes = 4;
static constexpr uint16_t markerBytes = 4;

using MarkerType = eckit::FixedString<markerBytes>;

Expand Down
30 changes: 15 additions & 15 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,33 @@ Client::~Client() {
connection_.remove(id_);
}

void Client::controlWriteCheckResponse(const Message msg,
const uint32_t requestID,
const bool dataListener,
const void* const payload,
const uint32_t payloadLength) const {
//----------------------------------------------------------------------------------------------------------------------

void Client::controlWriteCheckResponse(const Message msg,
const uint32_t requestID,
const bool dataListener,
const Payload payload) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);
ASSERT(payload.consistent());
std::lock_guard lock(blockingRequestMutex_);

PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }
if (!payload.empty()) { payloads.emplace_back(payload); }

auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads);
f.wait();
ASSERT(f.get().size() == 0);
}

eckit::Buffer Client::controlWriteReadResponse(const Message msg,
const uint32_t requestID,
const void* const payload,
const uint32_t payloadLength) const {
eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t requestID, const Payload payload) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);
ASSERT(payload.consistent());
std::lock_guard lock(blockingRequestMutex_);

PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }
if (!payload.empty()) { payloads.emplace_back(payload); }

auto f = connection_.controlWrite(*this, msg, requestID, false, payloads);
f.wait();
Expand All @@ -94,4 +92,6 @@ void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) {
connection_.dataWrite(*this, msg, requestID, std::move(payloads));
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::remote
47 changes: 37 additions & 10 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@

#pragma once

#include "eckit/memory/NonCopyable.h"
#include "eckit/net/Endpoint.h"

#include "fdb5/remote/Connection.h"
#include "fdb5/remote/Messages.h"
#include "fdb5/remote/client/ClientConnection.h"

#include "eckit/memory/NonCopyable.h"
#include "eckit/net/Endpoint.h"
#include "eckit/serialisation/MemoryStream.h"

#include <cstddef> // std::size_t
#include <cstdint> // std::uint32_t
#include <mutex>
#include <string>
#include <utility> // std::pair
#include <vector>

Expand Down Expand Up @@ -45,7 +49,7 @@ class Client : eckit::NonCopyable {
public: // methods
Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint);

Client(const EndpointList& endpoints);
explicit Client(const EndpointList& endpoints);

virtual ~Client();

Expand All @@ -60,21 +64,42 @@ class Client : eckit::NonCopyable {
uint32_t generateRequestID() const { return connection_.generateRequestID(); }

// blocking requests

void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, Payload payload = {}) const;

void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const BufferStream& buffer) const {
controlWriteCheckResponse(msg, requestID, dataListener, buffer.payload());
}

void controlWriteCheckResponse(Message msg,
uint32_t requestID,
bool dataListener,
const void* payload = nullptr,
uint32_t payloadLength = 0) const;
const void* payload,
uint32_t payloadLength) const {
controlWriteCheckResponse(msg, requestID, dataListener, {payloadLength, payload});
}

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, Payload payload = {}) const;

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, const BufferStream& buffer) const {
return controlWriteReadResponse(msg, requestID, buffer.payload());
}

[[nodiscard]]
eckit::Buffer controlWriteReadResponse(Message msg,
uint32_t requestID,
const void* payload = nullptr,
uint32_t payloadLength = 0) const;

const void* payload,
uint32_t payloadLength) const {
return controlWriteReadResponse(msg, requestID, {payloadLength, payload});
}
void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {});

// handlers for incoming messages - to be defined in the client class
virtual bool handle(Message message, uint32_t requestID) = 0;

virtual bool handle(Message message, uint32_t requestID) = 0;

virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0;

protected:
Expand All @@ -89,4 +114,6 @@ class Client : eckit::NonCopyable {
mutable std::mutex blockingRequestMutex_;
};

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::remote
3 changes: 2 additions & 1 deletion src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const {

void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) {

static size_t maxQueueLength = eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320);
static const size_t maxQueueLength =
eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", defaultDataWriteQueueLength);

{
// retrieve or add client to the list
Expand Down
2 changes: 2 additions & 0 deletions src/fdb5/remote/client/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class DataWriteRequest;

class ClientConnection : protected Connection {

static constexpr size_t defaultDataWriteQueueLength = 320;

public: // methods
~ClientConnection() override;

Expand Down
Loading
Loading