Skip to content

Commit

Permalink
Merge pull request #7367 from alrossi/fix/9.2/qos-scanner-operation-c…
Browse files Browse the repository at this point in the history
…ompletion

dcache-qos:  fix scanner operation completion logic
  • Loading branch information
svemeyer authored Oct 4, 2023
2 parents 9367ede + 12a1fbf commit 576c9dd
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ public void run() {
break;
}

LOGGER.trace("Pool watchdog initiating scan.");
LOGGER.trace("calling run scans.");
runScans();
LOGGER.trace("Pool watchdog scan completed.");
LOGGER.trace("run scans returned.");

recordSweep(start, System.currentTimeMillis());
}

LOGGER.info("Exiting pool operation consumer.");
LOGGER.info("Exiting operation consumer.");
clear();

LOGGER.info("Pool operation queues cleared.");
LOGGER.info("operation queues cleared.");
}

public void runNow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,36 @@ public class SystemOperationMap extends ScanOperationMap {
ALL_IDLE_ENABLED_POOLS = filter;
}

static class ScanIndices {
final long start;
final long end;
long currentOffset = 0L;
long nextOffset = 0L;

public ScanIndices(long start, long end) {
this.start = start;
this.end = end;
}

long currentOffset() {
return currentOffset;
}

long nextOffset() {
return nextOffset;
}

boolean hasNext() {
return nextOffset < end;
}

boolean setNextOffset(long from, long to, long batchSize) {
currentOffset = start + (from * batchSize);
nextOffset = Math.min(start + (to * batchSize), end);
return currentOffset < nextOffset;
}
}

private final Map<String, SystemScanOperation> online = new HashMap<>();
private final Map<String, SystemScanOperation> qosNearline = new HashMap<>();
private final EvictingQueue<String> history = EvictingQueue.create(100);
Expand All @@ -135,6 +165,9 @@ public class SystemOperationMap extends ScanOperationMap {
private long lastPoolScanStart;
private long nextPoolScanStart;

private ScanIndices qosIndices;
private ScanIndices onlineIndices;

private SysOpHandler handler;
private QoSScannerCounters counters;

Expand Down Expand Up @@ -190,7 +223,7 @@ public void cancelAll(boolean qos) {
public String configSettings() {
return String.format("system online scan window %s %s\n"
+ "system online scan is %s\n"
+ "system qosNearline scan window %s %s\n"
+ "system qos (nearline) scan window %s %s\n"
+ "max concurrent operations %s\n"
+ "period set to %s %s\n\n",
onlineRescanWindow,
Expand Down Expand Up @@ -218,8 +251,8 @@ public void getInfo(PrintWriter pw) {
seconds = 0L;
}
counters.appendDHMSElapsedTime(seconds, SCAN_DURATION, builder);
builder.append(String.format("last qosNearline (nearline) scan start %s\n"
+ "last qosNearline (nearline) scan end %s\n",
builder.append(String.format("last qos (nearline) scan start %s\n"
+ "last qos (nearline) scan end %s\n",
new Date(lastQosNearlineScanStart),
new Date(lastQosNearlineScanEnd)));
seconds = TimeUnit.MILLISECONDS.toSeconds(lastQosNearlineScanEnd - lastQosNearlineScanStart);
Expand Down Expand Up @@ -270,7 +303,7 @@ public void runScans() {
lock.lock();
try {
if (!isQosNearlineRunning() && isQosNearlinePastExpiration()) {
LOGGER.info("runScans: starting qosNearline system scans");
LOGGER.info("runScans: starting qos (nearline) system scans");
start(true);
}

Expand Down Expand Up @@ -441,22 +474,29 @@ private void handleDone(SystemScanOperation operation) {
remove(operation.id);
history.add(operation.toString());

boolean isQosPermanent = operation.qos;
boolean hasNext = hasNext(operation.qos);

LOGGER.info("handleDone, hasNext {}, qos {}, online {}, qos {}.", hasNext,
operation.qos, online.size(), qosNearline.size());

if (operation.isFinal()) {
if (isQosPermanent && qosNearline.isEmpty()) {
state &= (~QOS_NEARLINE);
lastQosNearlineScanEnd = System.currentTimeMillis();
if (!hasNext) {
if (operation.qos) {
if (qosNearline.isEmpty()) {
state &= (~QOS_NEARLINE);
lastQosNearlineScanEnd = System.currentTimeMillis();
qosIndices = null;
}
} else if (online.isEmpty()) {
state &= (~ONLINE);
lastOnlineScanEnd = System.currentTimeMillis();
onlineIndices = null;
}
} else {
int loopWidth = maxConcurrentRunning;
int batchSize = getBatchSize(isQosPermanent);
int batchSize = getBatchSize(operation.qos);
long fromIndex = (operation.from / batchSize) + loopWidth;
long toIndex = (operation.to / batchSize) + loopWidth;
submit(fromIndex, toIndex, operation.minMaxIndices, isQosPermanent);
submit(fromIndex, toIndex, operation.qos);
}
}

Expand Down Expand Up @@ -493,6 +533,9 @@ private SystemScanOperation remove(String id) {
SystemScanOperation operation = online.remove(id);
if (operation == null) {
operation = qosNearline.remove(id);
LOGGER.info("removed qos operation {}.", operation);
} else {
LOGGER.info("removed online operation {}.", operation);
}
return operation;
}
Expand All @@ -502,24 +545,25 @@ private SystemScanOperation remove(String id) {
*/
@GuardedBy("lock")
private void start(boolean qos) throws CacheException {
if (!onlineScanEnabled && !qos) {
LOGGER.info("start: overriding disabled flag to run online scan");
}

long[] indices = handler.getMinMaxIndices(qos);
long[] minMaxIndices = handler.getMinMaxIndices(qos);
int count = maxConcurrentRunning;

if (indices[1] == 0) {
LOGGER.info("start: no {} entries to scan.", qos ? "QOS_NEARLINE" : "ONLINE");
if (minMaxIndices[1] == 0) {
LOGGER.info("start: no {} entries to scan.", qos ? "qos (nearline)" : "online");
return;
}

ScanIndices indices = new ScanIndices(minMaxIndices[0], minMaxIndices[1]);

if (qos) {
qosIndices = indices;
} else {
onlineIndices = indices;
}

LOGGER.info("start: loop count {}.", count);
for (int i = 0; i < count; ++i) {
LOGGER.info("start: submitting {} scan {}.", qos ? "QOS_NEARLINE" : "ONLINE", i);
if (submit(i, i + 1, indices, qos) > indices[1]) {
break;
}
for (int i = 0; i < count && hasNext(qos); ++i) {
submit(i, i + 1, qos);
}

if (qos) {
Expand All @@ -540,18 +584,18 @@ private void startPoolScans() {
}

@GuardedBy("lock")
private long submit(long fromIndex, long toIndex, long[] minmax, boolean qos) {
private void submit(long fromIndex, long toIndex, boolean qos) {
int batchSize = getBatchSize(qos);
long start = minmax[0] + (fromIndex * batchSize);
long end = Math.min(minmax[0] + (toIndex * batchSize), minmax[1]);
if (start > end) {
return end;
ScanIndices indices = qos ? qosIndices : onlineIndices;
if (!indices.setNextOffset(fromIndex, toIndex, batchSize)) {
return;
}
SystemScanOperation operation = new SystemScanOperation(start, end, qos);
operation.minMaxIndices = minmax;
SystemScanOperation operation = new SystemScanOperation(indices.currentOffset(),
indices.nextOffset(), qos);
LOGGER.info("submitting scan for start {}, next {}, qos {}.", indices.currentOffset(),
indices.nextOffset(), qos);
operation.lastScan = System.currentTimeMillis();
submit(operation);
return end;
}

@GuardedBy("lock")
Expand All @@ -565,4 +609,9 @@ private void submit(SystemScanOperation operation) {
put(operation);
operation.task.submit();
}

@GuardedBy("lock")
private boolean hasNext(boolean qos) {
return qos ? qosIndices.hasNext() : onlineIndices.hasNext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public final class SystemScanOperation extends ScanOperation<SystemScanTask> {

long lastUpdate;
long lastScan;
long[] minMaxIndices;

SystemScanTask task;
CacheException exception;
Expand Down Expand Up @@ -163,10 +162,6 @@ protected boolean isComplete() {
return isComplete;
}

boolean isFinal() {
return to >= minMaxIndices[1];
}

boolean isQos() {
return qos;
}
Expand Down

0 comments on commit 576c9dd

Please sign in to comment.