Skip to content

Commit

Permalink
Move client code to a separate class (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius authored Dec 26, 2024
1 parent 6e205b9 commit 8eb96b4
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 158 deletions.
7 changes: 3 additions & 4 deletions examples/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ TMessageHolder<TMessage> TKv::Write(TMessageHolder<TLogEntry> message, uint64_t
return {};
}

TMessageHolder<TLogEntry> TKv::Prepare(TMessageHolder<TCommandRequest> command, uint64_t term) {
TMessageHolder<TLogEntry> TKv::Prepare(TMessageHolder<TCommandRequest> command) {
auto dataSize = command->Len - sizeof(TCommandRequest);
auto entry = NewHoldedMessage<TLogEntry>(sizeof(TLogEntry)+dataSize);
memcpy(entry->Data, command->Data, dataSize);
entry->Term = term;
return entry;
}

Expand Down Expand Up @@ -202,11 +201,11 @@ int main(int argc, char** argv) {
if (persist) {
state = std::make_shared<TDiskState>("state", myHost.Id);
}
auto raft = std::make_shared<TRaft>(rsm, state, myHost.Id, nodes);
auto raft = std::make_shared<TRaft>(state, myHost.Id, nodes);
TPoller::TSocket socket(NNet::TAddress{myHost.Address, myHost.Port}, loop.Poller());
socket.Bind();
socket.Listen();
TRaftServer server(loop.Poller(), std::move(socket), raft, nodes, timeSource);
TRaftServer server(loop.Poller(), std::move(socket), raft, rsm, nodes, timeSource);
server.Serve();
loop.Loop();
} else {
Expand Down
3 changes: 1 addition & 2 deletions examples/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ class TKv: public IRsm {
public:
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) override;
TMessageHolder<TMessage> Write(TMessageHolder<TLogEntry> message, uint64_t index) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message) override;

private:
uint64_t LastAppliedIndex = 0;
std::unordered_map<std::string, std::string> H;
};
11 changes: 4 additions & 7 deletions examples/sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TSql: public IRsm {
// insert, update, create
TMessageHolder<TMessage> Write(TMessageHolder<TLogEntry> message, uint64_t index) override;
// convert request to log message
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message) override;

private:
bool Execute(const std::string& q);
Expand All @@ -59,7 +59,6 @@ class TSql: public IRsm {

TResult Result;
std::string LastError;
uint64_t LastAppliedIndex = 0;
sqlite3* Db = nullptr;
};

Expand Down Expand Up @@ -185,12 +184,10 @@ TMessageHolder<TMessage> TSql::Reply(const std::string& ans, uint64_t index)
return res;
}

TMessageHolder<TLogEntry> TSql::Prepare(TMessageHolder<TCommandRequest> command, uint64_t term) {
TMessageHolder<TLogEntry> TSql::Prepare(TMessageHolder<TCommandRequest> command) {
auto dataSize = command->Len - sizeof(TCommandRequest);
std::cerr << "Prepare entry of size: " << dataSize << ", in term: " << term << std::endl;
auto entry = NewHoldedMessage<TLogEntry>(sizeof(TLogEntry)+dataSize);
memcpy(entry->Data, command->Data, dataSize);
entry->Term = term;
return entry;
}

Expand Down Expand Up @@ -302,11 +299,11 @@ int main(int argc, char** argv)

std::shared_ptr<IRsm> rsm = std::make_shared<TSql>("sql_file.db", myHost.Id);
auto state = std::make_shared<TDiskState>("sql_state", myHost.Id);
auto raft = std::make_shared<TRaft>(rsm, state, myHost.Id, nodes);
auto raft = std::make_shared<TRaft>(state, myHost.Id, nodes);
TPoller::TSocket socket(NNet::TAddress{myHost.Address, myHost.Port}, loop.Poller());
socket.Bind();
socket.Listen();
TRaftServer server(loop.Poller(), std::move(socket), raft, nodes, timeSource);
TRaftServer server(loop.Poller(), std::move(socket), raft, rsm, nodes, timeSource);
server.Serve();
loop.Loop();
} else {
Expand Down
6 changes: 3 additions & 3 deletions server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ int main(int argc, char** argv) {
}

std::shared_ptr<IRsm> rsm = std::make_shared<TDummyRsm>();
auto raft = std::make_shared<TRaft>(rsm, std::make_shared<TState>(), myHost.Id, nodes);
auto raft = std::make_shared<TRaft>(std::make_shared<TState>(), myHost.Id, nodes);
TPoller::TSocket socket(NNet::TAddress{myHost.Address, myHost.Port}, loop.Poller());
socket.Bind();
socket.Listen();
if (ssl) {
auto sslSocket = NNet::TSslSocket(std::move(socket), *serverContext.get());
TRaftServer server(loop.Poller(), std::move(sslSocket), raft, nodes, timeSource);
TRaftServer server(loop.Poller(), std::move(sslSocket), raft, rsm, nodes, timeSource);
server.Serve();
loop.Loop();
} else {
TRaftServer server(loop.Poller(), std::move(socket), raft, nodes, timeSource);
TRaftServer server(loop.Poller(), std::move(socket), raft, rsm, nodes, timeSource);
server.Serve();
loop.Loop();
}
Expand Down
Loading

0 comments on commit 8eb96b4

Please sign in to comment.