Skip to content

Commit

Permalink
Merge pull request #7369 from alrossi/feature/9.2/bulk-reset-failed-cmd
Browse files Browse the repository at this point in the history
dcache-bulk:  add admin command and query to reset all requests with …
  • Loading branch information
svemeyer authored Oct 4, 2023
2 parents 8ade537 + b2b76e9 commit a55628d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,26 @@ public String call() throws Exception {
}
}

@Command(name = "request retry failed",
hint = "Retry only those requests which have failed targets.",
description = "Calls reset on the requests matching this criterion.")
class RequestRetryFailed implements Callable<String> {

@Override
public String call() throws Exception {
executor.submit(()-> {
try {
int count = requestStore.retryFailed();
LOGGER.info("{} requests with failed targets have been reset.", count);
} catch (BulkStorageException e) {
LOGGER.error("could not reset failed: {}.", e.toString());
}
});

return "Resetting requests with failed targets.";
}
}

@Command(name = "request submit",
hint = "Launch a bulk request.",
description = "Command-line version of the RESTful request.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> reverse, lon
*/
void reset(String uid) throws BulkStorageException;

/**
* Retry all requests that have FAILED targets.
*
* @throws BulkStorageException
*/
int retryFailed() throws BulkStorageException;

/**
* Store the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.PLACEHOLDER_PNFSID;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PATH;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.CREATED;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
Expand All @@ -93,6 +94,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
Expand All @@ -119,7 +121,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestFilter;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.vehicles.FileAttributes;
Expand Down Expand Up @@ -555,6 +556,17 @@ public void reset(String uid) throws BulkStorageException {
}
}

@Override
public int retryFailed() throws BulkStorageException {
AtomicInteger count = new AtomicInteger(0);
List<String> uids = requestTargetDao.getRequestsOfFailed();
for (String uid: uids) {
reset(uid);
count.incrementAndGet();
}
return count.get();
}

@Required
public void setArchiveDao(JdbcBulkArchiveDao archiveDao) {
this.archiveDao = archiveDao;
Expand Down Expand Up @@ -790,7 +802,7 @@ private String checkRequestPermissions(Subject subject, String uid)

private void conditionallyClearTerminalRequest(BulkRequest stored) {
Long rid = stored.getId();
if (requestTargetDao.count(requestTargetDao.where().rid(rid).state(State.FAILED)) > 0) {
if (requestTargetDao.count(requestTargetDao.where().rid(rid).state(FAILED)) > 0) {
if (stored.isClearOnFailure()) {
clear(stored.getUid());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ static class TargetPlaceholder {

static final String JOINED_TABLE_NAMES_FOR_SELECT = SECONDARY_TABLE_NAME + ", " + TABLE_NAME;

static final String UIDS_OF_FAILED
= "SELECT r.uid FROM bulk_request r WHERE r.status = 'COMPLETED' AND EXISTS " +
"(SELECT * FROM request_target t WHERE r.id = t.rid AND t.state = 'FAILED')";

static final ParameterizedPreparedStatementSetter<TargetPlaceholder> SETTER = (ps, target) -> {
Instant now = Instant.now();
ps.setInt(1, PID.INITIAL.ordinal());
Expand Down Expand Up @@ -164,6 +168,10 @@ public List<BulkRequestTarget> get(JdbcRequestTargetCriterion criterion, int lim
this, criterion.isJoined() ? this::toFullRequestTarget : this::toRequestTarget);
}

public List<String> getRequestsOfFailed() {
return getJdbcTemplate().queryForList(UIDS_OF_FAILED, String.class);
}

public Optional<KeyHolder> insert(JdbcRequestTargetUpdate update) {
return utils.insert(update, TABLE_NAME, this);
}
Expand Down

0 comments on commit a55628d

Please sign in to comment.