Skip to content

Commit 5b5efd8

Browse files
committed
Clean up obsolete configuration parameters of the Qserv worker
1 parent 9a7fb3f commit 5b5efd8

File tree

5 files changed

+1
-75
lines changed

5 files changed

+1
-75
lines changed

src/admin/templates/xrootd/etc/xrdssi.cf.jinja

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,6 @@ maxsqlconn = 980
6666
# This value must be less than maxsqlconn.
6767
reservedinteractivesqlconn = 930
6868

69-
[transmits]
70-
# Maximum number of concurrent transmits to a czar.
71-
maxtransmits = 50
72-
# If more than this number of large transmits is happening at once, wait to
73-
# start more transmits until some are done.
74-
maxalreadytransmitting = 10
75-
7669
[results]
7770

7871
# The name of a folder where query results will be stored.

src/wconfig/WorkerConfig.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,6 @@ class WorkerConfig {
150150
return _ReservedInteractiveSqlConnections->getVal();
151151
}
152152

153-
/// @return the maximum number of gigabytes that can be used by StreamBuffers
154-
unsigned int getBufferMaxTotalGB() const { return _bufferMaxTotalGB->getVal(); }
155-
156-
/// @return the maximum number of concurrent transmits to a czar
157-
unsigned int getMaxTransmits() const { return _maxTransmits->getVal(); }
158-
159-
int getMaxPerQid() const { return _maxPerQid->getVal(); }
160-
161153
/// @return the name of a folder where query results will be stored
162154
std::string const resultsDirname() const { return _resultsDirname->getVal(); }
163155

@@ -292,11 +284,6 @@ class WorkerConfig {
292284
util::ConfigValTUInt::create(_configValMap, "sqlconnections", "maxsqlconn", notReq, 800);
293285
CVTUIntPtr _ReservedInteractiveSqlConnections = util::ConfigValTUInt::create(
294286
_configValMap, "sqlconnections", "reservedinteractivesqlconn", notReq, 50);
295-
CVTUIntPtr _bufferMaxTotalGB =
296-
util::ConfigValTUInt::create(_configValMap, "transmit", "buffermaxtotalgb", notReq, 41);
297-
CVTUIntPtr _maxTransmits =
298-
util::ConfigValTUInt::create(_configValMap, "transmit", "maxtransmits", notReq, 40);
299-
CVTIntPtr _maxPerQid = util::ConfigValTInt::create(_configValMap, "transmit", "maxperqid", notReq, 3);
300287
CVTStrPtr _resultsDirname =
301288
util::ConfigValTStr::create(_configValMap, "results", "dirname", notReq, "/qserv/data/results");
302289
CVTUIntPtr _resultsXrootdPort =

src/xrdsvc/SsiService.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,6 @@ SsiService::SsiService(XrdSsiLogger* log) {
156156
throw wconfig::WorkerConfigError("Unable to connect to MySQL");
157157
}
158158
auto const workerConfig = wconfig::WorkerConfig::instance();
159-
int64_t bufferMaxTotalBytes = workerConfig->getBufferMaxTotalGB() * 1'000'000'000LL;
160-
StreamBuffer::setMaxTotalBytes(bufferMaxTotalBytes);
161159

162160
// Set thread pool size.
163161
unsigned int poolSize = ranges::max({wsched::BlendScheduler::getMinPoolSize(),

src/xrdsvc/StreamBuffer.cc

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,37 +42,8 @@ using namespace std;
4242

4343
namespace lsst::qserv::xrdsvc {
4444

45-
atomic<int64_t> StreamBuffer::_maxTotalBytes{40'000'000'000};
46-
atomic<int64_t> StreamBuffer::_totalBytes(0);
47-
mutex StreamBuffer::_createMtx;
48-
condition_variable StreamBuffer::_createCv;
49-
50-
void StreamBuffer::setMaxTotalBytes(int64_t maxBytes) {
51-
string const context = "StreamBuffer::" + string(__func__) + " ";
52-
LOGS(_log, LOG_LVL_INFO, context << "maxBytes=" << maxBytes);
53-
if (maxBytes < 0) {
54-
throw invalid_argument(context + "negative " + to_string(maxBytes));
55-
}
56-
if (maxBytes < 1'000'000'000LL) {
57-
LOGS(_log, LOG_LVL_ERROR, "Very small value for " << context << maxBytes);
58-
}
59-
_maxTotalBytes = maxBytes;
60-
}
61-
62-
double StreamBuffer::percentOfMaxTotalBytesUsed() {
63-
double percent = ((double)_totalBytes) / ((double)_maxTotalBytes);
64-
if (percent < 0.0) percent = 0.0;
65-
if (percent > 1.0) percent = 1.0;
66-
return percent;
67-
}
68-
6945
// Factory function, because this should be able to delete itself when Recycle() is called.
7046
StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, std::shared_ptr<wbase::Task> const &task) {
71-
unique_lock<mutex> uLock(_createMtx);
72-
if (_totalBytes >= _maxTotalBytes) {
73-
LOGS(_log, LOG_LVL_WARN, "StreamBuffer at memory limit " << _totalBytes);
74-
}
75-
_createCv.wait(uLock, []() { return _totalBytes < _maxTotalBytes; });
7647
Ptr ptr(new StreamBuffer(input, task));
7748
ptr->_selfKeepAlive = ptr;
7849
return ptr;
@@ -95,14 +66,6 @@ StreamBuffer::StreamBuffer(std::string &input, wbase::Task::Ptr const &task) : _
9566
if (_wStats != nullptr) {
9667
_wStats->startQueryRespConcurrentQueued(_createdTime);
9768
}
98-
99-
_totalBytes += _dataStr.size();
100-
LOGS(_log, LOG_LVL_DEBUG, "StreamBuffer::_totalBytes=" << _totalBytes << " thisSize=" << _dataStr.size());
101-
}
102-
103-
StreamBuffer::~StreamBuffer() {
104-
_totalBytes -= _dataStr.size();
105-
LOGS(_log, LOG_LVL_DEBUG, "~StreamBuffer::_totalBytes=" << _totalBytes);
10669
}
10770

10871
void StreamBuffer::startTimer() {

src/xrdsvc/StreamBuffer.h

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,8 @@ class StreamBuffer : public XrdSsiStream::Buffer {
6666
static StreamBuffer::Ptr createWithMove(std::string &input,
6767
std::shared_ptr<wbase::Task> const &task = nullptr);
6868

69-
/// Set the maximum number of bytes that can be used by all instances of this class.
70-
static void setMaxTotalBytes(int64_t maxBytes);
71-
72-
/// @return the percent of totalBytes used out of _maxTotalByes.
73-
static double percentOfMaxTotalBytesUsed();
74-
7569
size_t getSize() const { return _dataStr.size(); }
7670

77-
/// @Return total number of bytes used by ALL StreamBuffer objects.
78-
static size_t getTotalBytes() { return _totalBytes; }
79-
8071
/// Call to recycle the buffer when finished (normally called by XrdSsi).
8172
void Recycle() override;
8273

@@ -90,7 +81,7 @@ class StreamBuffer : public XrdSsiStream::Buffer {
9081
/// Unblock the condition variable on cancel.
9182
void cancel();
9283

93-
~StreamBuffer() override;
84+
~StreamBuffer() override = default;
9485

9586
private:
9687
/// This constructor will invalidate 'input'.
@@ -115,12 +106,6 @@ class StreamBuffer : public XrdSsiStream::Buffer {
115106
/// Pointer for worker statistics.
116107
/// NOTE: This will be nullptr for many things, so check before using.
117108
std::shared_ptr<wcontrol::WorkerStats> _wStats;
118-
119-
// Members associated with limiting memory use.
120-
static std::atomic<int64_t> _totalBytes; ///< Total bytes currently in use by all StreamBuffer instances.
121-
static std::atomic<int64_t> _maxTotalBytes;
122-
static std::mutex _createMtx;
123-
static std::condition_variable _createCv;
124109
};
125110

126111
} // namespace lsst::qserv::xrdsvc

0 commit comments

Comments
 (0)