Skip to content

Commit

Permalink
bulk: Fix handling of symbolic links and relatibe paths
Browse files Browse the repository at this point in the history
Motivation:
----------
Recent change(s) that massaged user input target paths and
stored absolute paths on bulk backend lead to ambiguity between
user provided and dcache paths and also resulted in inability
to use full paths (i.e. only relative paths are supported). At
Fermilab we need to use both - relative and absolute paths

Modification:
------------
Revert all recent changes that appended prefix to user
supplied paths, stored the result and then stripped the
prefix so that only "original" paths are exposed to the
user.
Instead, like before, store user supplied paths but carry
over request prefix which is computed from user root and
door root. When calling PnfsManager using paths the full
patths of the targets are reassebled using the prefix

Result:
-------
Restored ability to use absolute paths when using REST API.

Issue: dCache#7693
Target: trunk
Request: 10.2, 10.1, 10.0, 9.2
Require-book: no
Require-notes: yes
Signed-off-by: Dmitry Litvintsev [email protected]
  • Loading branch information
DmitryLitvintsev committed Nov 27, 2024
1 parent 59ea69f commit 9a3613d
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
*
* @param rid of the request.
* @param tid of the target.
* @param prefix target prefix
* @param path of the target on which to perform the activity.
* @return future result of the activity.
* @throws BulkServiceException
*/
public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, FileAttributes attributes)
public abstract ListenableFuture<R> perform(String rid, long tid, String prefix, FsPath path, FileAttributes attributes)
throws BulkServiceException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ public DeleteActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, FsPath path,
public ListenableFuture<PnfsDeleteEntryMessage> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(path.toString());
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
PnfsDeleteEntryMessage msg = new PnfsDeleteEntryMessage(absolutePath.toString());
msg.setSubject(subject);
if (attributes != null && attributes.getFileType() == FileType.DIR && skipDirs) {
msg.setSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected LogTargetActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath path,
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
long now = System.currentTimeMillis();
BulkRequestTarget t = BulkRequestTargetBuilder.builder(null).activity(this.getName()).id(tid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,20 @@ public PinActivity(String name, TargetType targetType) {

public void cancel(BulkRequestTarget target) {
super.cancel(target);
try {
pinManager.send(unpinMessage(id, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel pinning.", e));
}
pinManager.send(unpinMessage(id, target.getAttributes().getPnfsId()));
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
if (id == null) {
id = rid;
}

try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}

checkPinnable(attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static diskCacheV111.util.CacheException.INVALID_ARGS;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;

import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -130,11 +131,12 @@ protected FileAttributes getAttributes(FsPath path) throws CacheException {
return pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
}

protected PinManagerUnpinMessage unpinMessage(String id, BulkRequestTarget target)
protected PinManagerUnpinMessage unpinMessage(String id, String prefix, BulkRequestTarget target)
throws CacheException {
PnfsId pnfsId = target.getPnfsId();
if (pnfsId == null) {
pnfsId = getAttributes(target.getPath()).getPnfsId();
FsPath absolutePath = computeFsPath(prefix, target.getPath().toString());
pnfsId = getAttributes(absolutePath).getPnfsId();
}
return unpinMessage(id, pnfsId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.ReleaseActivityProvider.REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -77,11 +78,12 @@ public ReleaseActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,29 @@ public StageActivity(String name, TargetType targetType) {

public void cancel(BulkRequestTarget target) {
super.cancel(target);
try {
pinManager.send(unpinMessage(id, target));
} catch (CacheException e) {
target.setErrorObject(new BulkServiceException("unable to fetch pnfsid of target in "
+ "order to cancel staging.", e));
}
pinManager.send(unpinMessage(id, target.getAttributes().getPnfsId()));
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
id = rid;

try {
/*
* refetch the attributes because RP is not stored in the bulk database.
*/
attributes = getAttributes(target);

FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);

checkStageable(attributes);

PinManagerPinMessage message
= new PinManagerPinMessage(attributes, getProtocolInfo(),
pnfsHandler.getRestriction(),
id,
getLifetimeInMillis(target));
getLifetimeInMillis(path));
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.pin;

import static org.dcache.services.bulk.activity.plugin.pin.UnpinActivityProvider.UNPIN_REQUEST_ID;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -78,11 +79,12 @@ public UnpinActivity(String name, TargetType targetType) {
}

@Override
public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
public ListenableFuture<Message> perform(String rid, long tid, String prefix, FsPath path,
FileAttributes attributes) {
try {
if (attributes == null) {
attributes = getAttributes(target);
FsPath absolutePath = computeFsPath(prefix, path.toString());
attributes = getAttributes(absolutePath);
}
return pinManager.send(unpinMessage(id, attributes.getPnfsId()));
} catch (CacheException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ public synchronized void cancel(BulkRequestTarget target) {

@Override
public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long tid,
FsPath path, FileAttributes attributes) throws BulkServiceException {
String prefix, FsPath path, FileAttributes attributes) throws BulkServiceException {
if (targetQos == null && qosPolicy == null) {
return Futures.immediateFailedFuture(new IllegalArgumentException("no target qos or policy given."));
}

if (attributes == null) {
try {
attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
FsPath absolutePath = BulkRequestTarget.computeFsPath(prefix, path.toString());
attributes = pnfsHandler.getFileAttributes(absolutePath.toString(), MINIMALLY_REQUIRED_ATTRIBUTES);
} catch (CacheException e) {
throw new BulkServiceException("failed to retrieve file attributes", e);
}
Expand Down Expand Up @@ -232,6 +233,3 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
this.pnfsHandler = pnfsHandler;
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
import org.dcache.vehicles.PnfsGetFileAttributes;
import org.dcache.vehicles.PnfsResolveSymlinksMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -140,18 +139,7 @@ public final class BulkRequestContainerJob
static final AtomicLong taskCounter = new AtomicLong(0L);

public static FsPath findAbsolutePath(String prefix, String path) {
FsPath absPath = computeFsPath(null, path);
if (prefix == null) {
return absPath;
}

FsPath pref = FsPath.create(prefix);

if (!absPath.hasPrefix(pref)) {
absPath = computeFsPath(prefix, path);
}

return absPath;
return computeFsPath(prefix, path);
}

/**
Expand Down Expand Up @@ -235,7 +223,7 @@ enum ContainerState {
* proper paths and attributes from listing.
*/
enum TaskState {
RESOLVE_PATH, FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET
FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET
}

/**
Expand Down Expand Up @@ -464,9 +452,6 @@ void doInner() {
try {
checkForRequestCancellation();
switch (state) {
case RESOLVE_PATH:
resolvePath();
break;
case FETCH_ATTRIBUTES:
fetchAttributes();
break;
Expand Down Expand Up @@ -512,43 +497,15 @@ void performSync() throws InterruptedException {
}

/**
* (1) symlink resolution on initial targets; bypassed for discovered targets.
* (1) retrieval of required file attributes.
*/
private void resolvePath() {
LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath());
PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage(
target.getPath().toString(), null);
ListenableFuture<PnfsResolveSymlinksMessage> requestFuture = pnfsHandler.requestAsync(
message);
CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() {
@Override
public void success(PnfsResolveSymlinksMessage message) {
LOGGER.debug("{} - resolvePath {}, callback success.", ruid, target.getPath());
FsPath path = FsPath.create(message.getResolvedPath());
if (targetPrefix != null && !path.contains(targetPrefix)) {
path = computeFsPath(targetPrefix, path.toString());
}
LOGGER.debug("{} - resolvePath, resolved path {}", ruid, path);
target.setPath(path);
state = TaskState.FETCH_ATTRIBUTES;
taskFuture = executor.submit(TargetTask.this);
}
private void fetchAttributes() {
FsPath absolutePath = findAbsolutePath(targetPrefix,
target.getPath().toString());
LOGGER.error("{} - fetchAttributes for path {}, prefix {}, absolute path {} ", ruid, target.getPath(), targetPrefix, absolutePath);

@Override
public void failure(int rc, Object error) {
LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target);
storeOrUpdate(CacheExceptionFactory.exceptionOf(
rc, Objects.toString(error, null)));
}
}, callbackExecutor);
}

/**
* (2) retrieval of required file attributes.
*/
private void fetchAttributes() {
LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath());
PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(),
PnfsGetFileAttributes message = new PnfsGetFileAttributes(absolutePath.toString(),
MINIMALLY_REQUIRED_ATTRIBUTES);
ListenableFuture<PnfsGetFileAttributes> requestFuture = pnfsHandler.requestAsync(
message);
Expand All @@ -573,7 +530,7 @@ public void failure(int rc, Object error) {
}

/**
* (3b) either recurs on directory or performs activity on file.
* (2b) either recurs on directory or performs activity on file.
*/
private void handleTarget() throws InterruptedException {
LOGGER.debug("{} - handleTarget for {}, path {}.", ruid, target.getActivity(),
Expand Down Expand Up @@ -611,17 +568,20 @@ private void performActivity(boolean async) throws InterruptedException {
FsPath path = target.getPath();
FileAttributes attributes = target.getAttributes();
LOGGER.debug("{} - performActivity {} on {}.", ruid, activity, path);

storeOrUpdate(null);

if (hasBeenSpecificallyCancelled(this)) {
LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid,
path);
path);
remove();
}

try {
activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes);
activityFuture = activity.perform(ruid,
id == null ? seqNo : id,
targetPrefix,
path,
attributes);
if (async) {
activityFuture.addListener(() -> handleCompletion(), callbackExecutor);
permitHolder.throttledRelease();
Expand Down Expand Up @@ -1077,7 +1037,7 @@ private void processFileTargets() {
for (BulkRequestTarget target : requestTargets) {
try {
checkForRequestCancellation();
new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync();
new TargetTask(target, TaskState.FETCH_ATTRIBUTES).submitAsync();
} catch (InterruptedException e) {
/*
* Cancel most likely called; stop processing.
Expand Down Expand Up @@ -1128,4 +1088,4 @@ private void update() {

signalStateChange();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ public final class BulkRequestTarget {
RUNNING};

public static FsPath computeFsPath(String prefix, String target) {
if (prefix == null) {
return FsPath.create(FsPath.ROOT + target);
} else {
return FsPath.create(
FsPath.ROOT + (prefix.endsWith("/") ? prefix : prefix + "/") + target);
FsPath absolutePath = FsPath.create(FsPath.ROOT + target);
if (prefix != null) {
FsPath pref = FsPath.create(prefix);
if (!absolutePath.hasPrefix(pref)) {
absolutePath = FsPath.create(
FsPath.ROOT + (prefix.endsWith("/") ? prefix : prefix + "/") + target);
}
}
return absolutePath;
}

public static final FsPath ROOT_REQUEST_PATH = computeFsPath(null, "=request_target=");
Expand Down
Loading

0 comments on commit 9a3613d

Please sign in to comment.