Skip to content

Commit

Permalink
Restrict number of cached elements
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Mar 30, 2024
1 parent 4e8dd1d commit caf1a9b
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 30 deletions.
5 changes: 3 additions & 2 deletions example/benchmark/scripts/create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ fi
aws s3api create-bucket --bucket $CONTAINER --region $REGION --create-bucket-configuration LocationConstraint=$REGION
#------------------------------------------------------
mkdir data
mkdir data/enc
cd data
for size in ${SIZES[@]}
do
for n in {1..8192}
do
openssl rand -out ${n}.bin $size
openssl enc -aes-256-cbc -K "3031323334353637383930313233343536373839303132333435363738393031" -iv "30313233343536373839303132333435" -in ${n}.bin -out ${n}.enc.bin
openssl enc -aes-256-cbc -K "3031323334353637383930313233343536373839303132333435363738393031" -iv "30313233343536373839303132333435" -in ${n}.bin -out enc/${n}.bin
done
aws s3 sync ./ s3://${CONTAINER}/${size}/
for n in {1..8192}
do
rm ${n}.bin
rm ${n}.enc.bin
rm enc/${n}.bin
done
done
#------------------------------------------------------
8 changes: 7 additions & 1 deletion include/network/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Cache {
std::unique_ptr<DnsEntry> dns;
/// The optional tls connection
std::unique_ptr<TLSConnection> tls;
/// The timestamp
size_t timestamp;
/// The fd
int32_t fd;
/// The port
Expand All @@ -57,6 +59,10 @@ class Cache {
protected:
/// The cache, uses hostname as key, multimap traversals same keys in the insertion order
std::multimap<std::string, std::unique_ptr<SocketEntry>> _cache;
/// The fifo deletion of sockets to help reduce open fds (ulimit -n issue)
std::map<size_t, SocketEntry*> _fifo;
/// The timestamp counter for deletion
size_t _timestamp = 0;
/// The default priority
int _defaultPriority = 8;

Expand All @@ -68,7 +74,7 @@ class Cache {
/// Stops the socket and either closes the connection or cashes it
virtual void stopSocket(std::unique_ptr<SocketEntry> socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket);
/// Shutdown of the socket and clears the same addresses
virtual void shutdownSocket(std::unique_ptr<SocketEntry> socketEntry);
virtual void shutdownSocket(std::unique_ptr<SocketEntry> socketEntry, unsigned cachedEntries);
/// The destructor
virtual ~Cache();

Expand Down
6 changes: 6 additions & 0 deletions src/cloud/aws_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ unique_ptr<network::Cache::SocketEntry> AWSCache::resolve(string hostname, unsig
{
for (auto it = _cache.find(hostname); it != _cache.end();) {
// loosely clean up the multimap cache
if (!it->second->dns) {
_fifo.erase(it->second->timestamp);
it = _cache.erase(it);
continue;
}
if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) {
auto socketEntry = move(it->second);
socketEntry->dns->cachePriority--;
_fifo.erase(socketEntry->timestamp);
_cache.erase(it);
return socketEntry;
}
Expand Down
26 changes: 21 additions & 5 deletions src/network/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,39 @@ Cache::SocketEntry::SocketEntry(string hostname, unsigned port) : dns(nullptr),
// The constructor
{}
//---------------------------------------------------------------------------
void Cache::shutdownSocket(unique_ptr<Cache::SocketEntry> socketEntry)
void Cache::shutdownSocket(unique_ptr<Cache::SocketEntry> socketEntry, unsigned cacheEntries)
// Shutdown the socket and dns cache
{
// delete all occurences of the cached ip in map
// delete all occurences of the cached ips in the cache map
if (socketEntry->hostname.length() > 0) {
for (auto it = _cache.find(socketEntry->hostname); it != _cache.end();) {
if (!strncmp(socketEntry->dns->addr->ai_addr->sa_data, it->second->dns->addr->ai_addr->sa_data, 14)) {
it->second->dns->cachePriority = 0;
stopSocket(move(it->second), 0, 0, false);
_fifo.erase(it->second->timestamp);
stopSocket(move(it->second), 0, cacheEntries, false);
it = _cache.erase(it);
} else {
it++;
}
}
}
stopSocket(move(socketEntry), 0, 0, false);
stopSocket(move(socketEntry), 0, cacheEntries, false);
}
//---------------------------------------------------------------------------
void Cache::stopSocket(unique_ptr<Cache::SocketEntry> socketEntry, uint64_t /*bytes*/, unsigned /*cacheEntries*/, bool reuseSocket)
void Cache::stopSocket(unique_ptr<Cache::SocketEntry> socketEntry, uint64_t /*bytes*/, unsigned cacheEntries, bool reuseSocket)
// Stops the socket and either closes the connection or cashes it
{
if (reuseSocket && socketEntry->hostname.length() > 0 && socketEntry->port) {
if (socketEntry->dns->cachePriority > 0) {
socketEntry->timestamp = _timestamp++;
for (auto it = _fifo.begin(); _fifo.size() >= cacheEntries;) {
if (it->second->fd >= 0)
close(it->second->fd);
it->second->fd = -1;
it->second->dns = nullptr;
it = _fifo.erase(it);
}
_fifo.emplace(socketEntry->timestamp, socketEntry.get());
_cache.emplace(socketEntry->hostname, move(socketEntry));
} else {
close(socketEntry->fd);
Expand All @@ -78,9 +88,15 @@ unique_ptr<Cache::SocketEntry> Cache::resolve(string hostname, unsigned port, bo
{
for (auto it = _cache.find(hostname); it != _cache.end();) {
// loosely clean up the multimap cache
if (!it->second->dns) {
_fifo.erase(it->second->timestamp);
it = _cache.erase(it);
continue;
}
if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) {
auto socketEntry = move(it->second);
socketEntry->dns->cachePriority--;
_fifo.erase(socketEntry->timestamp);
_cache.erase(it);
return socketEntry;
}
Expand Down
46 changes: 24 additions & 22 deletions src/network/connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,77 +74,78 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
throw runtime_error("Socket creation error!" + string(strerror(errno)));
}

auto maxCacheEntries = (_maxCachedFds / _activeConnectionManagers) + 1;
// Settings for socket
// No blocking mode
if (tcpSettings.nonBlocking > 0) {
int flags = fcntl(socketEntry->fd, F_GETFL, 0);
flags |= O_NONBLOCK;
if (fcntl(socketEntry->fd, F_SETFL, flags) < 0) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - non blocking error");
}
}

// Keep Alive
if (tcpSettings.keepAlive > 0) {
if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_KEEPALIVE, &tcpSettings.keepAlive, sizeof(tcpSettings.keepAlive))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - keep alive error");
}
}

// Keep Idle
if (tcpSettings.keepIdle > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPIDLE, &tcpSettings.keepIdle, sizeof(tcpSettings.keepIdle))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - keep idle error");
}
}

// Keep intvl
if (tcpSettings.keepIntvl > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPINTVL, &tcpSettings.keepIntvl, sizeof(tcpSettings.keepIntvl))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - keep intvl error");
}
}

// Keep cnt
if (tcpSettings.keepCnt > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPCNT, &tcpSettings.keepCnt, sizeof(tcpSettings.keepCnt))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - keep cnt error");
}
}

// No Delay
if (tcpSettings.noDelay > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_NODELAY, &tcpSettings.noDelay, sizeof(tcpSettings.noDelay))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - nodelay error");
}
}

// Reuse ports
if (tcpSettings.reusePorts > 0) {
if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_REUSEPORT, &tcpSettings.reusePorts, sizeof(tcpSettings.reusePorts))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - reuse port error");
}
}

// Recv buffer
if (tcpSettings.recvBuffer > 0) {
if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_RCVBUF, &tcpSettings.recvBuffer, sizeof(tcpSettings.recvBuffer))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! - recvbuf error");
}
}

// Lingering of sockets
if (tcpSettings.linger > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_LINGER2, &tcpSettings.linger, sizeof(tcpSettings.linger))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - linger timeout error!" + string(strerror(errno)));
}
}
Expand All @@ -158,7 +159,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
}

if (setsockopt(socketEntry->fd, SOL_TCP, TCP_WINSHIFT, &windowShift, sizeof(windowShift))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - win shift error!");
}
}
Expand All @@ -169,7 +170,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
if (tcpSettings.rfc1323 && tcpSettings.recvBuffer >= (64 << 10)) {
int on = 1;
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_RFC1323, &on, sizeof(on))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - rfc 1323 error!");
}
}
Expand All @@ -179,29 +180,29 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
#ifdef TCP_MAXSEG
if (tcpSettings.mss > 0) {
if (setsockopt(socketEntry->fd, SOL_TCP, TCP_MAXSEG, &tcpSettings.mss, sizeof(tcpSettings.mss))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - max segment error!");
}
}
#endif

auto setTimeOut = [&resCache, &socketEntry](int fd, const TCPSettings& tcpSettings) {
auto setTimeOut = [&resCache, &socketEntry, maxCacheEntries](int fd, const TCPSettings& tcpSettings) {
// Set timeout
if (tcpSettings.timeout > 0) {
struct timeval tv;
tv.tv_sec = tcpSettings.timeout / (1000 * 1000);
tv.tv_usec = tcpSettings.timeout % (1000 * 1000);
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&tv), sizeof tv)) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - recv timeout error!");
}
if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&tv), sizeof tv)) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - send timeout error!");
}
int timeoutInMs = tcpSettings.timeout / 1000;
if (setsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &timeoutInMs, sizeof(timeoutInMs))) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error - tcp timeout error!" + string(strerror(errno)));
}
}
Expand All @@ -210,7 +211,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
// Connect to remote
auto connectRes = ::connect(socketEntry->fd, socketEntry->dns->addr->ai_addr, socketEntry->dns->addr->ai_addrlen);
if (connectRes < 0 && errno != EINPROGRESS) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
if (retryLimit > 0) {
return connect(hostname, port, tls, tcpSettings, retryLimit - 1);
} else {
Expand Down Expand Up @@ -240,7 +241,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
int socketError;
socklen_t socketErrorLen = sizeof(socketError);
if (getsockopt(socketEntry->fd, SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen)) {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! Could not retrieve socket options!");
}

Expand All @@ -249,12 +250,12 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con
setTimeOut(socketEntry->fd, tcpSettings);
return emplaceSocket();
} else {
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
throw runtime_error("Socket creation error! " + string(strerror(socketError)));
}
} else {
// Reached timeout
resCache->shutdownSocket(move(socketEntry));
resCache->shutdownSocket(move(socketEntry), maxCacheEntries);
if (retryLimit > 0) {
return connect(hostname, port, tls, tcpSettings, retryLimit - 1);
} else {
Expand All @@ -279,11 +280,12 @@ void ConnectionManager::disconnect(int32_t fd, const TCPSettings* tcpSettings, u
} else {
resCache = _cache.find("")->second.get();
}
auto maxCacheEntries = (_maxCachedFds / _activeConnectionManagers) + 1;
if (forceShutdown) {
resCache->shutdownSocket(move(socketIt->second));
resCache->shutdownSocket(move(socketIt->second), maxCacheEntries);
_fdSockets.erase(socketIt);
} else {
resCache->stopSocket(move(socketIt->second), bytes, (_maxCachedFds / _activeConnectionManagers) + 1, tcpSettings ? tcpSettings->reuse : false);
resCache->stopSocket(move(socketIt->second), bytes, maxCacheEntries, tcpSettings ? tcpSettings->reuse : false);
_fdSockets.erase(socketIt);
}
}
Expand Down

0 comments on commit caf1a9b

Please sign in to comment.