From bbc21d8fc8a9a241a26d7d494d374b8c4e8ec196 Mon Sep 17 00:00:00 2001 From: jorgee Date: Thu, 24 Apr 2025 14:56:16 +0200 Subject: [PATCH 1/7] add fromLineage channel factory Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Channel.groovy | 22 +++ .../nextflow/extension/LinChannelEx.groovy | 9 ++ .../nextflow/lineage/LinChanneExImpl.groovy | 65 ++++++++ .../src/main/nextflow/lineage/LinUtils.groovy | 2 +- .../src/resources/META-INF/extensions.idx | 1 + .../lineage/LinChanneExImplTest.groovy | 144 ++++++++++++++++++ 6 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy create mode 100644 modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy create mode 100644 modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index ee1bb55f43..529ee623bb 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -16,6 +16,9 @@ package nextflow +import nextflow.extension.LinChannelEx +import nextflow.plugin.Plugins + import static nextflow.util.CheckHelper.* import java.nio.file.FileSystem @@ -657,4 +660,23 @@ class Channel { fromPath0Future = future.exceptionally(Channel.&handlerException) } + static DataflowWriteChannel fromLineage(String uri) { + final result = CH.create() + if( NF.isDsl2() ) { + session.addIgniter { fromLineage0(result, uri) } + } + else { + fromLineage0(result, uri ) + } + return result + } + + private static void fromLineage0(DataflowWriteChannel channel, String uri) { + final operation = Plugins.getExtension(LinChannelEx) + if( !operation ) + throw new IllegalStateException("Unable to load lineage extensions.") + def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, new URI(uri)) } as Runnable) + future.exceptionally(this.&handlerException) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy new file mode 100644 index 0000000000..2f96dfa969 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy @@ -0,0 +1,9 @@ +package nextflow.extension + +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Session + +interface LinChannelEx { + void queryLineage(Session session, DataflowWriteChannel channel, URI uri) + +} \ No newline at end of file diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy new file mode 100644 index 0000000000..0f3a6fb944 --- /dev/null +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy @@ -0,0 +1,65 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.lineage + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Channel +import nextflow.Session +import nextflow.extension.LinChannelEx + +/** + * Lineage channel extensions + * + * @author Jorge Ejarque + */ +@CompileStatic +@Slf4j +class LinChanneExImpl implements LinChannelEx{ + + void queryLineage(Session session, DataflowWriteChannel channel, URI uri) { + final store = getStore(session) + emitResults(channel, LinUtils.query(store, uri)) + channel.bind(Channel.STOP) + } + + protected LinStore getStore(Session session){ + final store = LinStoreFactory.getOrCreate(session) + if( !store ) { + throw new Exception("Lineage store not found - Check Nextflow configuration") + } + return store + } + + private static void emitResults(DataflowWriteChannel channel, Collection results){ + if( !results ) { + return + } + // Remove nested collections of a single element + if( results.size() == 1 ) { + final entry = results[0] + if( entry instanceof Collection ) { + emitResults(channel, entry) + } else { + channel.bind(LinUtils.encodeSearchOutputs(entry)) + } + } else + results.forEach { channel.bind(LinUtils.encodeSearchOutputs(it)) } + } + +} diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy index dfb5a4e634..b62d67abaf 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy @@ -316,7 +316,7 @@ class LinUtils { * @param output Output to encode * @return Output encoded as a JSON string */ - static String encodeSearchOutputs(Object output, boolean prettyPrint) { + static String encodeSearchOutputs(Object output, boolean prettyPrint = false) { if (output instanceof LinSerializable) { return new LinEncoder().withPrettyPrint(prettyPrint).encode(output) } else { diff --git a/modules/nf-lineage/src/resources/META-INF/extensions.idx b/modules/nf-lineage/src/resources/META-INF/extensions.idx index 53c350a1be..d4809ad1ad 100644 --- a/modules/nf-lineage/src/resources/META-INF/extensions.idx +++ b/modules/nf-lineage/src/resources/META-INF/extensions.idx @@ -17,3 +17,4 @@ nextflow.lineage.DefaultLinStoreFactory nextflow.lineage.LinObserverFactory nextflow.lineage.cli.LinCommandImpl +nextflow.lineage.LinChanneExImpl \ No newline at end of file diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy new file mode 100644 index 0000000000..5a67a08ea5 --- /dev/null +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy @@ -0,0 +1,144 @@ +/* + * Copyright 2013-2025, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.lineage + +import nextflow.Channel +import nextflow.extension.CH +import nextflow.lineage.model.Annotation +import nextflow.lineage.model.FileOutput + +import java.nio.file.Path +import java.time.Instant +import java.time.OffsetDateTime + +import nextflow.Session +import nextflow.lineage.config.LineageConfig +import nextflow.lineage.model.Checksum +import nextflow.lineage.model.DataPath +import nextflow.lineage.model.Parameter +import nextflow.lineage.model.Workflow +import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowRun + +import spock.lang.Specification +import spock.lang.TempDir + +import java.time.ZoneOffset + +/** + * Lineage channel extensions tests + * + * @author Jorge Ejarque + */ +class LinChanneExImplTest extends Specification { + + @TempDir + Path tempDir + + Path storeLocation + Map configMap + + def setup() { + storeLocation = tempDir.resolve("store") + configMap = [linage: [enabled: true, store: [location: storeLocation.toString()]]] + } + + def 'should get metadata fragment'() { + + given: + def uniqueId = UUID.randomUUID() + def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) + def workflow = new Workflow([mainScript], "https://nextflow.io/nf-test/", "123456") + def key = "testKey" + def params = [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] + def value1 = new WorkflowRun(workflow, uniqueId.toString(), "test_run", params) + def outputs = [new Parameter("String", "output", "name")] + def wfOutputs = new WorkflowOutput(OffsetDateTime.now(), "lid://testKey", outputs) + def lidStore = new DefaultLinStore() + def session = Mock(Session) { + getConfig() >> configMap + } + lidStore.open(LineageConfig.create(session)) + lidStore.save(key, value1) + lidStore.save("$key#output", wfOutputs) + def channelLinExt = Spy(new LinChanneExImpl()) + + when: + def results = CH.create() + channelLinExt.queryLineage(session, results, new URI('lid://testKey#params')) + then: + channelLinExt.getStore(session) >> lidStore + and: + results.val == LinUtils.encodeSearchOutputs(params[0]) + results.val == LinUtils.encodeSearchOutputs(params[1]) + results.val == Channel.STOP + + when: + results = CH.create() + channelLinExt.queryLineage(session, results, new URI('lid://testKey#output')) + then: + channelLinExt.getStore(session) >> lidStore + and: + results.val == LinUtils.encodeSearchOutputs(outputs[0]) + results.val == Channel.STOP + } + + def 'should return global query results' () { + given: + def uniqueId = UUID.randomUUID() + def time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(1234567), ZoneOffset.UTC) + def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) + def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" ) + def key = "testKey" + def value1 = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [ new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] ) + def key2 = "testKey2" + def value2 = new FileOutput("/path/tp/file1", new Checksum("78910", "nextflow", "standard"), "testkey", "testkey", null, 1234, time, time, [new Annotation("key1","value1"), new Annotation("key2","value2")]) + def key3 = "testKey3" + def value3 = new FileOutput("/path/tp/file2", new Checksum("78910", "nextflow", "standard"), "testkey", "testkey", null, 1234, time, time, [new Annotation("key2","value2"), new Annotation("key3","value3")]) + def key4 = "testKey4" + def value4 = new FileOutput("/path/tp/file", new Checksum("78910", "nextflow", "standard"), "testkey", "testkey", null, 1234, time, time, [new Annotation("key4","value4"), new Annotation("key3","value3")]) + def lidStore = new DefaultLinStore() + def session = Mock(Session) { + getConfig() >> configMap + } + lidStore.open(LineageConfig.create(session)) + lidStore.save(key, value1) + lidStore.save(key2, value2) + lidStore.save(key3, value3) + lidStore.save(key4, value4) + def channelLinExt = Spy(new LinChanneExImpl()) + when: + def results = CH.create() + channelLinExt.queryLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2")) + then: + channelLinExt.getStore(session) >> lidStore + and: + results.val == LinUtils.encodeSearchOutputs(value2) + results.val == LinUtils.encodeSearchOutputs(value3) + results.val == Channel.STOP + + when: + results = CH.create() + channelLinExt.queryLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2#path")) + then: + channelLinExt.getStore(session) >> lidStore + and: + results.val == '"/path/tp/file1"' + results.val == '"/path/tp/file2"' + results.val == Channel.STOP + } +} From fbdbeef23bde4ae2ed3c7bf628411cb4a2410087 Mon Sep 17 00:00:00 2001 From: jorgee Date: Thu, 24 Apr 2025 16:19:11 +0200 Subject: [PATCH 2/7] include query factory Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Channel.groovy | 21 ++++++++++++++++++- .../nextflow/extension/LinChannelEx.groovy | 3 ++- .../nextflow/lineage/LinChanneExImpl.groovy | 18 +++++++++++++++- .../lineage/LinChanneExImplTest.groovy | 8 +++---- 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index 529ee623bb..78e8c6e8f5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -675,7 +675,26 @@ class Channel { final operation = Plugins.getExtension(LinChannelEx) if( !operation ) throw new IllegalStateException("Unable to load lineage extensions.") - def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, new URI(uri)) } as Runnable) + def future = CompletableFuture.runAsync( { operation.viewLineage(session, channel, new URI(uri)) } as Runnable) + future.exceptionally(this.&handlerException) + } + + static DataflowWriteChannel fromLineageQuery(String queryString) { + final result = CH.create() + if( NF.isDsl2() ) { + session.addIgniter { fromLineageQuery0(result, queryString) } + } + else { + fromLineageQuery0(result, queryString ) + } + return result + } + + private static void fromLineageQuery0(DataflowWriteChannel channel, String query) { + final operation = Plugins.getExtension(LinChannelEx) + if( !operation ) + throw new IllegalStateException("Unable to load lineage extensions.") + def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, query) } as Runnable) future.exceptionally(this.&handlerException) } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy index 2f96dfa969..482ef26263 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy @@ -4,6 +4,7 @@ import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Session interface LinChannelEx { - void queryLineage(Session session, DataflowWriteChannel channel, URI uri) + void viewLineage(Session session, DataflowWriteChannel channel, URI uri) + void queryLineage(Session session, DataflowWriteChannel channel, String query) } \ No newline at end of file diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy index 0f3a6fb944..7a4365fd71 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy @@ -22,6 +22,9 @@ import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Channel import nextflow.Session import nextflow.extension.LinChannelEx +import nextflow.lineage.fs.LinPath +import nextflow.lineage.fs.LinPathFactory +import nextflow.lineage.serde.LinSerializable /** * Lineage channel extensions @@ -32,12 +35,19 @@ import nextflow.extension.LinChannelEx @Slf4j class LinChanneExImpl implements LinChannelEx{ - void queryLineage(Session session, DataflowWriteChannel channel, URI uri) { + void viewLineage(Session session, DataflowWriteChannel channel, URI uri) { final store = getStore(session) emitResults(channel, LinUtils.query(store, uri)) channel.bind(Channel.STOP) } + void queryLineage(Session session, DataflowWriteChannel channel, String query) { + final store = getStore(session) + emitSearchResults(channel, store.search(query)) + channel.bind(Channel.STOP) + } + + protected LinStore getStore(Session session){ final store = LinStoreFactory.getOrCreate(session) if( !store ) { @@ -62,4 +72,10 @@ class LinChanneExImpl implements LinChannelEx{ results.forEach { channel.bind(LinUtils.encodeSearchOutputs(it)) } } + private void emitSearchResults(DataflowWriteChannel channel, Map results) { + if( !results ) { + return + } + results.keySet().forEach { channel.bind(LinPathFactory.create(LinPath.LID_PROT + it)) } + } } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy index 5a67a08ea5..75b527b4e1 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy @@ -79,7 +79,7 @@ class LinChanneExImplTest extends Specification { when: def results = CH.create() - channelLinExt.queryLineage(session, results, new URI('lid://testKey#params')) + channelLinExt.viewLineage(session, results, new URI('lid://testKey#params')) then: channelLinExt.getStore(session) >> lidStore and: @@ -89,7 +89,7 @@ class LinChanneExImplTest extends Specification { when: results = CH.create() - channelLinExt.queryLineage(session, results, new URI('lid://testKey#output')) + channelLinExt.viewLineage(session, results, new URI('lid://testKey#output')) then: channelLinExt.getStore(session) >> lidStore and: @@ -123,7 +123,7 @@ class LinChanneExImplTest extends Specification { def channelLinExt = Spy(new LinChanneExImpl()) when: def results = CH.create() - channelLinExt.queryLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2")) + channelLinExt.viewLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2")) then: channelLinExt.getStore(session) >> lidStore and: @@ -133,7 +133,7 @@ class LinChanneExImplTest extends Specification { when: results = CH.create() - channelLinExt.queryLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2#path")) + channelLinExt.viewLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2#path")) then: channelLinExt.getStore(session) >> lidStore and: From 3aa4f0f3830fcbd2fdb70d7452ea5cc1a1f87982 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 25 Apr 2025 21:11:52 +0200 Subject: [PATCH 3/7] add published files in output, support queries in fromPath Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Session.groovy | 2 +- .../groovy/nextflow/file/PathVisitor.groovy | 23 ++- .../groovy/nextflow/file/QueryablePath.groovy | 13 ++ .../main/nextflow/lineage/LinObserver.groovy | 11 +- .../main/nextflow/lineage/fs/LinPath.groovy | 153 +++++++++++------- 5 files changed, 143 insertions(+), 59 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index a486123362..acc1c4fb7d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -381,7 +381,7 @@ class Session implements ISession { this.dag = new DAG() // -- init output dir - this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: 'results') + this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: config.navigate('params.outdir') ?: 'results') // -- init work dir this.workDir = FileHelper.toCanonicalPath(config.workDir ?: 'work') diff --git a/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy b/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy index 0a484bff9f..68e27785f8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy @@ -66,7 +66,7 @@ class PathVisitor { applyRegexPattern0(filePattern) else if( filePattern != null ) - applyGlobPattern0(filePattern as Path) + applyPathPattern0(filePattern as Path) else throw new IllegalArgumentException("Missing file pattern argument") @@ -103,6 +103,27 @@ class PathVisitor { target.bind(STOP) } + private void applyPathPattern0(Path filePattern) { + if( isQuery(filePattern) ) + applyQueryablePath0(filePattern as QueryablePath) + else + applyGlobPattern0(filePattern) + } + + private static boolean isQuery(Path filePattern) { + log.debug("Checking if query: $filePattern.class ") + return filePattern instanceof QueryablePath && (filePattern as QueryablePath).hasQuery() + } + + private boolean applyQueryablePath0(QueryablePath path) { + final paths = path.resolveQuery() + if( !paths ) + throw new FileNotFoundException("No files found for ${path}") + + paths.forEach { emit0(it) } + close0() + } + private void applyGlobPattern0(Path filePattern) { final glob = opts?.containsKey('glob') ? opts.glob as boolean : true diff --git a/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy b/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy new file mode 100644 index 0000000000..084c43d3b5 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy @@ -0,0 +1,13 @@ +package nextflow.file + +import java.nio.file.Path + +/** + * Interface to indicate a Path could contain a query that is resolved to several real paths. + * + * @author Jorge Ejarque + */ +interface QueryablePath { + boolean hasQuery(); + List resolveQuery(); +} \ No newline at end of file diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index 081a5bba7a..53ee984b2a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -87,6 +87,7 @@ class LinObserver implements TraceObserver { private Session session private WorkflowOutput workflowOutput private Map outputsStoreDirLid = new HashMap(10) + private Set publishedFiles = new HashSet() private PathNormalizer normalizer LinObserver(Session session, LinStore store){ @@ -124,6 +125,10 @@ class LinObserver implements TraceObserver { @Override void onFlowComplete(){ if (this.workflowOutput){ + //Add publishedFiles + for (String path: publishedFiles){ + workflowOutput.output.add(new Parameter(Path.simpleName, null, path)) + } workflowOutput.createdAt = OffsetDateTime.now() final key = executionHash + '#output' this.store.save(key, workflowOutput) @@ -360,6 +365,7 @@ class LinObserver implements TraceObserver { LinUtils.toDate(attrs?.lastModifiedTime()), convertAnnotations(annotations)) store.save(key, value) + publishedFiles.add(asUriString(key)) } catch (Throwable e) { log.warn("Unexpected error storing published file '${destination.toUriString()}' for workflow '${executionHash}'", e) } @@ -411,8 +417,9 @@ class LinObserver implements TraceObserver { private Object convertPathsToLidReferences(Object value){ if( value instanceof Path ) { try { - final key = getWorkflowOutputKey(value) - return asUriString(key) + final key = asUriString(getWorkflowOutputKey(value)) + publishedFiles.remove(key) + return key } catch (Throwable e){ //Workflow output key not found return value diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy index 8d0559f55a..c94607f0eb 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy @@ -20,6 +20,7 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.file.FileHelper import nextflow.file.LogicalDataPath +import nextflow.file.QueryablePath import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput import nextflow.lineage.serde.LinSerializable @@ -45,13 +46,13 @@ import java.time.OffsetDateTime */ @Slf4j @CompileStatic -class LinPath implements Path, LogicalDataPath { +class LinPath implements Path, LogicalDataPath, QueryablePath { - static public final List SUPPORTED_CHECKSUM_ALGORITHMS=["nextflow"] + static public final List SUPPORTED_CHECKSUM_ALGORITHMS = ["nextflow"] static public final String SEPARATOR = '/' public static final String LID_PROT = "${SCHEME}://" - static private final String[] EMPTY = new String[] {} + static private final String[] EMPTY = new String[]{} private LinFileSystem fileSystem @@ -72,12 +73,19 @@ class LinPath implements Path, LogicalDataPath { throw new IllegalArgumentException("Invalid LID URI - scheme is different for $SCHEME") } this.fileSystem = fs + setFieldsFormURI(uri) + //Check if query and fragment are with filePath + if (query == null && fragment == null){ + setFieldsFormURI(new URI(toUriString())) + } + } + private void setFieldsFormURI(URI uri){ this.query = uri.query this.fragment = uri.fragment - this.filePath = resolve0( fs, norm0("${uri.authority?:''}${uri.path}") ) + this.filePath = resolve0(fileSystem, norm0("${uri.authority?:''}${uri.path}") ) } - protected LinPath(String query, String fragment, String filepath, LinFileSystem fs){ + protected LinPath(String query, String fragment, String filepath, LinFileSystem fs) { this.fileSystem = fs this.query = query this.fragment = fragment @@ -100,9 +108,9 @@ class LinPath implements Path, LogicalDataPath { return path && path.startsWith(LID_PROT) } - private static String buildPath(String first, String[] more){ + private static String buildPath(String first, String[] more) { first = norm0(first) - if (more){ + if( more ) { final morePath = norm0(more).join(SEPARATOR) return first.isEmpty() ? morePath : first + SEPARATOR + morePath } @@ -117,25 +125,25 @@ class LinPath implements Path, LogicalDataPath { } protected static void validateChecksum(Checksum checksum, Path hashedPath) { - if( !checksum) + if( !checksum ) return - if( ! isAlgorithmSupported(checksum.algorithm) ) { + if( !isAlgorithmSupported(checksum.algorithm) ) { log.warn("Checksum of '$hashedPath' can't be validated. Algorithm '${checksum.algorithm}' is not supported") return } final hash = checksum.mode - ? CacheHelper.hasher(hashedPath, CacheHelper.HashMode.of(checksum.mode.toString().toLowerCase())).hash().toString() - : CacheHelper.hasher(hashedPath).hash().toString() - if (hash != checksum.value) + ? CacheHelper.hasher(hashedPath, CacheHelper.HashMode.of(checksum.mode.toString().toLowerCase())).hash().toString() + : CacheHelper.hasher(hashedPath).hash().toString() + if( hash != checksum.value ) log.warn("Checksum of '$hashedPath' does not match with the one stored in the metadata") } - protected static isAlgorithmSupported( String algorithm ){ + protected static isAlgorithmSupported(String algorithm) { return algorithm && algorithm in SUPPORTED_CHECKSUM_ALGORITHMS } @TestOnly - protected String getFilePath(){ this.filePath } + protected String getFilePath() { this.filePath } /** * Finds the target path of a LinPath. @@ -149,7 +157,7 @@ class LinPath implements Path, LogicalDataPath { * IllegalArgumentException if the filepath, filesystem or its LinStore are null. * FileNotFoundException if the filePath or children are not found in the LinStore. */ - protected static Path findTarget(LinFileSystem fs, String filePath, boolean resultsAsPath, String[] children=[]) throws Exception { + protected static Path findTarget(LinFileSystem fs, String filePath, boolean resultsAsPath, String[] children = []) throws Exception { if( !fs ) throw new IllegalArgumentException("Cannot get target path for a relative lineage path") if( filePath.isEmpty() || filePath == SEPARATOR ) @@ -158,11 +166,11 @@ class LinPath implements Path, LogicalDataPath { if( !store ) throw new Exception("Lineage store not found - Check Nextflow configuration") final object = store.load(filePath) - if ( object ){ + if( object ) { if( object instanceof FileOutput ) { return getTargetPathFromOutput(object, children) } - if( resultsAsPath ){ + if( resultsAsPath ) { return getMetadataAsTargetPath(object, fs, filePath, children) } } else { @@ -180,11 +188,11 @@ class LinPath implements Path, LogicalDataPath { throw new FileNotFoundException("Target path '$filePath' does not exist") } - protected static Path getMetadataAsTargetPath(LinSerializable results, LinFileSystem fs, String filePath, String[] children){ + protected static Path getMetadataAsTargetPath(LinSerializable results, LinFileSystem fs, String filePath, String[] children) { if( !results ) { throw new FileNotFoundException("Target path '$filePath' does not exist") } - if (children && children.size() > 0) { + if( children && children.size() > 0 ) { return getSubObjectAsPath(fs, filePath, results, children) } else { return generateLinMetadataPath(fs, filePath, results, children) @@ -209,13 +217,12 @@ class LinPath implements Path, LogicalDataPath { throw new FileNotFoundException("Target path '$key#output' does not exist") } return generateLinMetadataPath(fs, key, outputs, children) - } - else { + } else { return generateLinMetadataPath(fs, key, object, children) } } - private static LinMetadataPath generateLinMetadataPath(LinFileSystem fs, String key, Object object, String[] children){ + private static LinMetadataPath generateLinMetadataPath(LinFileSystem fs, String key, Object object, String[] children) { def creationTime = toFileTime(navigate(object, 'createdAt') as OffsetDateTime ?: OffsetDateTime.now()) final output = children ? navigate(object, children.join('.')) : object if( !output ) { @@ -229,19 +236,19 @@ class LinPath implements Path, LogicalDataPath { // return the real path stored in the metadata validateDataOutput(lidObject) def realPath = FileHelper.toCanonicalPath(lidObject.path as String) - if (children && children.size() > 0) + if( children && children.size() > 0 ) realPath = realPath.resolve(children.join(SEPARATOR)) - if (!realPath.exists()) + if( !realPath.exists() ) throw new FileNotFoundException("Target path '$realPath' does not exist") return realPath } - private static boolean isEmptyBase(LinFileSystem fs, String base){ + private static boolean isEmptyBase(LinFileSystem fs, String base) { return !base || base == SEPARATOR || (fs && base == "..") } private static String resolve0(LinFileSystem fs, String base, String[] more) { - if( isEmptyBase(fs,base) ) { + if( isEmptyBase(fs, base) ) { return resolveEmptyPathCase(fs, more as List) } if( base.contains(SEPARATOR) ) { @@ -253,8 +260,8 @@ class LinPath implements Path, LogicalDataPath { return more ? result.resolve(more.join(SEPARATOR)).toString() : result.toString() } - private static String resolveEmptyPathCase(LinFileSystem fs, List more ){ - switch(more.size()) { + private static String resolveEmptyPathCase(LinFileSystem fs, List more) { + switch( more.size() ) { case 0: return "/" case 1: @@ -265,7 +272,7 @@ class LinPath implements Path, LogicalDataPath { } static private String norm0(String path) { - if( !path || path==SEPARATOR) + if( !path || path == SEPARATOR ) return "" //Remove repeated elements path = Path.of(path.trim()).normalize().toString() @@ -273,12 +280,12 @@ class LinPath implements Path, LogicalDataPath { if( path.startsWith(SEPARATOR) ) path = path.substring(1) if( path.endsWith(SEPARATOR) ) - path = path.substring(0,path.size()-1) + path = path.substring(0, path.size() - 1) return path } - + static private String[] norm0(String... path) { - for( int i=0; i1 ) - return subpath(0,c-1) - if( c==1 ) - return new LinPath(fileSystem,SEPARATOR) + if( c > 1 ) + return subpath(0, c - 1) + if( c == 1 ) + return new LinPath(fileSystem, SEPARATOR) return null } @@ -322,21 +329,21 @@ class LinPath implements Path, LogicalDataPath { @Override Path getName(int index) { - if( index<0 ) + if( index < 0 ) throw new IllegalArgumentException("Path name index cannot be less than zero - offending value: $index") final path = Path.of(filePath) - if (index == path.nameCount - 1){ - return new LinPath( fragment, query, path.getName(index).toString(), null) + if( index == path.nameCount - 1 ) { + return new LinPath(fragment, query, path.getName(index).toString(), null) } - return new LinPath(index==0 ? fileSystem : null, path.getName(index).toString()) + return new LinPath(index == 0 ? fileSystem : null, path.getName(index).toString()) } @Override Path subpath(int beginIndex, int endIndex) { - if( beginIndex<0 ) + if( beginIndex < 0 ) throw new IllegalArgumentException("subpath begin index cannot be less than zero - offending value: $beginIndex") final path = Path.of(filePath) - return new LinPath(beginIndex==0 ? fileSystem : null, path.subpath(beginIndex, endIndex).toString()) + return new LinPath(beginIndex == 0 ? fileSystem : null, path.subpath(beginIndex, endIndex).toString()) } @Override @@ -369,7 +376,7 @@ class LinPath implements Path, LogicalDataPath { if( LinPath.class != other.class ) throw new ProviderMismatchException() - final that = (LinPath)other + final that = (LinPath) other if( that.fileSystem && this.fileSystem != that.fileSystem ) return other @@ -388,7 +395,7 @@ class LinPath implements Path, LogicalDataPath { final scheme = FileHelper.getUrlProtocol(path) if( !scheme ) { // consider the path as a lid relative path - return resolve(new LinPath(null,path)) + return resolve(new LinPath(null, path)) } if( scheme != SCHEME ) { throw new ProviderMismatchException() @@ -413,12 +420,12 @@ class LinPath implements Path, LogicalDataPath { // Compare 'filePath' as relative paths path = Path.of(filePath).relativize(Path.of(lidOther.filePath)) } - return new LinPath(lidOther.query, lidOther.fragment, path.getNameCount()>0 ? path.toString() : SEPARATOR, null) + return new LinPath(lidOther.query, lidOther.fragment, path.getNameCount() > 0 ? path.toString() : SEPARATOR, null) } @Override URI toUri() { - return asUri("${SCHEME}://${filePath}${query ? '?' + query: ''}${fragment ? '#'+ fragment : ''}") + return asUri("${SCHEME}://${filePath}${query ? '?' + query : ''}${fragment ? '#' + fragment : ''}") } String toUriString() { @@ -455,7 +462,7 @@ class LinPath implements Path, LogicalDataPath { * @return Path associated to a DataOutput or LinMetadataFile with the metadata object for other types. * @throws FileNotFoundException if the metadata associated to the LinPath does not exist */ - protected Path getTargetOrMetadataPath(){ + protected Path getTargetOrMetadataPath() { return findTarget(fileSystem, filePath, true, parseChildrenFromFragment(fragment)) } @@ -479,7 +486,7 @@ class LinPath implements Path, LogicalDataPath { if( LinPath.class != other.class ) { return false } - final that = (LinPath)other + final that = (LinPath) other return this.fileSystem == that.fileSystem && this.filePath.equals(that.filePath) } @@ -488,24 +495,60 @@ class LinPath implements Path, LogicalDataPath { */ @Override int hashCode() { - return Objects.hash(fileSystem,filePath) + return Objects.hash(fileSystem, filePath) } static URI asUri(String path) { - if (!path) + if( !path ) throw new IllegalArgumentException("Missing 'path' argument") - if (!path.startsWith(LID_PROT)) + if( !path.startsWith(LID_PROT) ) throw new IllegalArgumentException("Invalid LID file system path URI - it must start with '${LID_PROT}' prefix - offendinf value: $path") - if (path.startsWith(LID_PROT + SEPARATOR) && path.length() > 7) + if( path.startsWith(LID_PROT + SEPARATOR) && path.length() > 7 ) throw new IllegalArgumentException("Invalid LID file system path URI - make sure the schema prefix does not container more than two slash characters - offending value: $path") - if (path == LID_PROT) //Empty path case + if( path == LID_PROT ) //Empty path case return new URI("lid:///") return new URI(path) } @Override String toString() { - return "$filePath${query ? '?' + query: ''}${fragment ? '#'+ fragment : ''}".toString() + return "$filePath${query ? '?' + query : ''}${fragment ? '#' + fragment : ''}".toString() + } + + @Override + boolean hasQuery() { + //Lin path is a query when is root (no filepath or /) and has the query field + return (filePath.isEmpty() || filePath == SEPARATOR) && query && fileSystem + } + + @Override + List resolveQuery() { + final store = fileSystem.getStore() + if( !store ) + throw new Exception("Lineage store not found - Check Nextflow configuration") + final results = store.search(query) + return parseResults(results) + } + + private List parseResults(Map results) { + if( !results ) + throw new FileNotFoundException("No files found for ${this.toUriString()}") + final List parsedResults = [] + for( def res : results ) { + parsedResults << parseResult(res.key, res.value) + } + return parsedResults + } + + private Path parseResult(String key, LinSerializable object) { + if( fragment ) + return getSubObjectAsPath(fileSystem, key, object, parseChildrenFromFragment(fragment)) + + if( object instanceof FileOutput ) { + return new LinPath(fileSystem, key) + } else { + return generateLinMetadataPath(fileSystem, key, object, null) + } } } From cdd9e89cd831c0cbc229e40aebaf4abd937098c5 Mon Sep 17 00:00:00 2001 From: jorgee Date: Fri, 25 Apr 2025 21:24:11 +0200 Subject: [PATCH 4/7] rename fromLinageQuery to queryLineage Signed-off-by: jorgee --- .../nextflow/src/main/groovy/nextflow/Channel.groovy | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index 78e8c6e8f5..e1436ac0ef 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -679,18 +679,18 @@ class Channel { future.exceptionally(this.&handlerException) } - static DataflowWriteChannel fromLineageQuery(String queryString) { + static DataflowWriteChannel queryLineage(String queryString) { final result = CH.create() if( NF.isDsl2() ) { - session.addIgniter { fromLineageQuery0(result, queryString) } + session.addIgniter { queryLineage0(result, queryString) } } else { - fromLineageQuery0(result, queryString ) + queryLineage0(result, queryString ) } return result } - private static void fromLineageQuery0(DataflowWriteChannel channel, String query) { + private static void queryLineage0(DataflowWriteChannel channel, String query) { final operation = Plugins.getExtension(LinChannelEx) if( !operation ) throw new IllegalStateException("Unable to load lineage extensions.") @@ -698,4 +698,4 @@ class Channel { future.exceptionally(this.&handlerException) } -} +} \ No newline at end of file From 62a2cfadfa02fedfffbe57b41e8c23c60b9255fb Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 29 Apr 2025 11:46:40 +0200 Subject: [PATCH 5/7] lineage API refactor and remove other implementations Signed-off-by: jorgee --- .../src/main/groovy/nextflow/Channel.groovy | 29 +++--------- .../src/main/groovy/nextflow/Session.groovy | 2 +- .../nextflow/extension/LinChannelEx.groovy | 22 +++++++++- .../nextflow/extension/OperatorImpl.groovy | 18 ++++++++ .../groovy/nextflow/file/PathVisitor.groovy | 23 +--------- .../groovy/nextflow/file/QueryablePath.groovy | 13 ------ .../nextflow/lineage/DefaultLinStore.groovy | 10 +---- ...eExImpl.groovy => LinChannelExImpl.groovy} | 36 +++++---------- .../main/nextflow/lineage/LinObserver.groovy | 11 +---- .../src/main/nextflow/lineage/LinStore.groovy | 6 +-- .../src/main/nextflow/lineage/LinUtils.groovy | 2 +- .../lineage/cli/LinCommandImpl.groovy | 6 ++- .../main/nextflow/lineage/fs/LinPath.groovy | 39 +--------------- .../src/resources/META-INF/extensions.idx | 2 +- .../lineage/DefaultLinStoreTest.groovy | 2 +- ...est.groovy => LinChannelExImplTest.groovy} | 44 ++++++++----------- .../lineage/cli/LinCommandImplTest.groovy | 10 ++++- 17 files changed, 98 insertions(+), 177 deletions(-) delete mode 100644 modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy rename modules/nf-lineage/src/main/nextflow/lineage/{LinChanneExImpl.groovy => LinChannelExImpl.groovy} (63%) rename modules/nf-lineage/src/test/nextflow/lineage/{LinChanneExImplTest.groovy => LinChannelExImplTest.groovy} (79%) diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index e1436ac0ef..c930a9694a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -660,41 +660,22 @@ class Channel { fromPath0Future = future.exceptionally(Channel.&handlerException) } - static DataflowWriteChannel fromLineage(String uri) { + static DataflowWriteChannel queryLineage(Map params) { final result = CH.create() if( NF.isDsl2() ) { - session.addIgniter { fromLineage0(result, uri) } + session.addIgniter { queryLineage0(result, params) } } else { - fromLineage0(result, uri ) + queryLineage0(result, params ) } return result } - private static void fromLineage0(DataflowWriteChannel channel, String uri) { + private static void queryLineage0(DataflowWriteChannel channel, Map params) { final operation = Plugins.getExtension(LinChannelEx) if( !operation ) throw new IllegalStateException("Unable to load lineage extensions.") - def future = CompletableFuture.runAsync( { operation.viewLineage(session, channel, new URI(uri)) } as Runnable) - future.exceptionally(this.&handlerException) - } - - static DataflowWriteChannel queryLineage(String queryString) { - final result = CH.create() - if( NF.isDsl2() ) { - session.addIgniter { queryLineage0(result, queryString) } - } - else { - queryLineage0(result, queryString ) - } - return result - } - - private static void queryLineage0(DataflowWriteChannel channel, String query) { - final operation = Plugins.getExtension(LinChannelEx) - if( !operation ) - throw new IllegalStateException("Unable to load lineage extensions.") - def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, query) } as Runnable) + def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, params) } as Runnable) future.exceptionally(this.&handlerException) } diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index acc1c4fb7d..a486123362 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -381,7 +381,7 @@ class Session implements ISession { this.dag = new DAG() // -- init output dir - this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: config.navigate('params.outdir') ?: 'results') + this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: 'results') // -- init work dir this.workDir = FileHelper.toCanonicalPath(config.workDir ?: 'work') diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy index 482ef26263..7b0978ec8e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy @@ -3,8 +3,26 @@ package nextflow.extension import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Session +/** + * Interface to implement the Lineage channel factories and functions. + * @author Jorge Ejarque params) } \ No newline at end of file diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 3614de19db..7ab23ecd34 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -36,6 +36,7 @@ import nextflow.Channel import nextflow.Global import nextflow.NF import nextflow.Session +import nextflow.plugin.Plugins import nextflow.script.ChannelOut import nextflow.script.TokenBranchDef import nextflow.script.TokenMultiMapDef @@ -1246,4 +1247,21 @@ class OperatorImpl { .getOutput() } + /** + * Transform the items emitted by a channel by applying a function to each of them + * + * @param channel + * @return + */ + DataflowWriteChannel lineage(final DataflowReadChannel source) { + assert source != null + final operation = Plugins.getExtension(LinChannelEx) + if( !operation ) + throw new IllegalStateException("Unable to load lineage extensions.") + final closure = { operation.viewLineage(session, it) } + return new MapOp(source, closure).apply() + } + + + } diff --git a/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy b/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy index 68e27785f8..0a484bff9f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/PathVisitor.groovy @@ -66,7 +66,7 @@ class PathVisitor { applyRegexPattern0(filePattern) else if( filePattern != null ) - applyPathPattern0(filePattern as Path) + applyGlobPattern0(filePattern as Path) else throw new IllegalArgumentException("Missing file pattern argument") @@ -103,27 +103,6 @@ class PathVisitor { target.bind(STOP) } - private void applyPathPattern0(Path filePattern) { - if( isQuery(filePattern) ) - applyQueryablePath0(filePattern as QueryablePath) - else - applyGlobPattern0(filePattern) - } - - private static boolean isQuery(Path filePattern) { - log.debug("Checking if query: $filePattern.class ") - return filePattern instanceof QueryablePath && (filePattern as QueryablePath).hasQuery() - } - - private boolean applyQueryablePath0(QueryablePath path) { - final paths = path.resolveQuery() - if( !paths ) - throw new FileNotFoundException("No files found for ${path}") - - paths.forEach { emit0(it) } - close0() - } - private void applyGlobPattern0(Path filePattern) { final glob = opts?.containsKey('glob') ? opts.glob as boolean : true diff --git a/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy b/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy deleted file mode 100644 index 084c43d3b5..0000000000 --- a/modules/nextflow/src/main/groovy/nextflow/file/QueryablePath.groovy +++ /dev/null @@ -1,13 +0,0 @@ -package nextflow.file - -import java.nio.file.Path - -/** - * Interface to indicate a Path could contain a query that is resolved to several real paths. - * - * @author Jorge Ejarque - */ -interface QueryablePath { - boolean hasQuery(); - List resolveQuery(); -} \ No newline at end of file diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy index fb64bbe2c7..92802061c2 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinStore.groovy @@ -103,15 +103,7 @@ class DefaultLinStore implements LinStore { void close() throws IOException { } @Override - Map search(String queryString) { - def params = null - if (queryString) { - params = LinUtils.parseQuery(queryString) - } - return searchAllFiles(params) - } - - private Map searchAllFiles(Map params) { + Map search(Map params) { final results = new HashMap() Files.walkFileTree(metaLocation, new FileVisitor() { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy similarity index 63% rename from modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy rename to modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy index 7a4365fd71..508ef6ce88 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinChanneExImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy @@ -33,22 +33,24 @@ import nextflow.lineage.serde.LinSerializable */ @CompileStatic @Slf4j -class LinChanneExImpl implements LinChannelEx{ +class LinChannelExImpl implements LinChannelEx { - void viewLineage(Session session, DataflowWriteChannel channel, URI uri) { + Object viewLineage(Session session, String lid) { final store = getStore(session) - emitResults(channel, LinUtils.query(store, uri)) - channel.bind(Channel.STOP) + final results = LinUtils.query(store, new URI(lid)) + if( !results ) { + throw new FileNotFoundException("No entry found for $lid") + } + return LinUtils.encodeSearchOutputs(results.size() == 1 ? results[0] : results) } - void queryLineage(Session session, DataflowWriteChannel channel, String query) { + void queryLineage(Session session, DataflowWriteChannel channel, Map params) { final store = getStore(session) - emitSearchResults(channel, store.search(query)) + emitSearchResults(channel, store.search(params)) channel.bind(Channel.STOP) } - - protected LinStore getStore(Session session){ + protected LinStore getStore(Session session) { final store = LinStoreFactory.getOrCreate(session) if( !store ) { throw new Exception("Lineage store not found - Check Nextflow configuration") @@ -56,26 +58,10 @@ class LinChanneExImpl implements LinChannelEx{ return store } - private static void emitResults(DataflowWriteChannel channel, Collection results){ - if( !results ) { - return - } - // Remove nested collections of a single element - if( results.size() == 1 ) { - final entry = results[0] - if( entry instanceof Collection ) { - emitResults(channel, entry) - } else { - channel.bind(LinUtils.encodeSearchOutputs(entry)) - } - } else - results.forEach { channel.bind(LinUtils.encodeSearchOutputs(it)) } - } - private void emitSearchResults(DataflowWriteChannel channel, Map results) { if( !results ) { return } - results.keySet().forEach { channel.bind(LinPathFactory.create(LinPath.LID_PROT + it)) } + results.keySet().forEach { channel.bind(LinPath.LID_PROT + it) } } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index 53ee984b2a..081a5bba7a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -87,7 +87,6 @@ class LinObserver implements TraceObserver { private Session session private WorkflowOutput workflowOutput private Map outputsStoreDirLid = new HashMap(10) - private Set publishedFiles = new HashSet() private PathNormalizer normalizer LinObserver(Session session, LinStore store){ @@ -125,10 +124,6 @@ class LinObserver implements TraceObserver { @Override void onFlowComplete(){ if (this.workflowOutput){ - //Add publishedFiles - for (String path: publishedFiles){ - workflowOutput.output.add(new Parameter(Path.simpleName, null, path)) - } workflowOutput.createdAt = OffsetDateTime.now() final key = executionHash + '#output' this.store.save(key, workflowOutput) @@ -365,7 +360,6 @@ class LinObserver implements TraceObserver { LinUtils.toDate(attrs?.lastModifiedTime()), convertAnnotations(annotations)) store.save(key, value) - publishedFiles.add(asUriString(key)) } catch (Throwable e) { log.warn("Unexpected error storing published file '${destination.toUriString()}' for workflow '${executionHash}'", e) } @@ -417,9 +411,8 @@ class LinObserver implements TraceObserver { private Object convertPathsToLidReferences(Object value){ if( value instanceof Path ) { try { - final key = asUriString(getWorkflowOutputKey(value)) - publishedFiles.remove(key) - return key + final key = getWorkflowOutputKey(value) + return asUriString(key) } catch (Throwable e){ //Workflow output key not found return value diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy index 3f826b7a0a..ba31b12df9 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy @@ -55,9 +55,9 @@ interface LinStore extends Closeable { /** * Search for lineage entries. - * @queryString Json-path like query string. (Only simple and nested field operators are supported(No array, wildcards,etc.) - * @return Key-lineage entry pairs fulfilling the queryString + * @params Map of query params + * @return Key-lineage entry pairs fulfilling the query params */ - Map search(String queryString) + Map search(Map params) } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy index b62d67abaf..e426a67436 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy @@ -62,7 +62,7 @@ class LinUtils { } private static Collection globalSearch(LinStore store, URI uri) { - final results = store.search(uri.query).values() + final results = store.search(parseQuery(uri.query)).values() if (results && uri.fragment) { // If fragment is defined get the property of the object indicated by the fragment return filterResults(results, uri.fragment) diff --git a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy index 43290e46ff..ad4fa7477e 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy @@ -17,6 +17,7 @@ package nextflow.lineage.cli import static nextflow.lineage.fs.LinPath.* +import static nextflow.lineage.LinUtils.* import java.nio.charset.StandardCharsets import java.nio.file.Path @@ -319,7 +320,10 @@ class LinCommandImpl implements CmdLineage.LinCommand { return } try { - println LinUtils.encodeSearchOutputs(store.search(args[0]).keySet().collect {asUriString(it)}, true) + final params = args.collectEntries { + it.split('=').collect { URLDecoder.decode(it, 'UTF-8') } + } as Map + println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true ) } catch (Throwable e){ println "Error searching for ${args[0]}. ${e.message}" } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy index c94607f0eb..fccaf88ab8 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy @@ -20,7 +20,6 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.file.FileHelper import nextflow.file.LogicalDataPath -import nextflow.file.QueryablePath import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput import nextflow.lineage.serde.LinSerializable @@ -46,7 +45,7 @@ import java.time.OffsetDateTime */ @Slf4j @CompileStatic -class LinPath implements Path, LogicalDataPath, QueryablePath { +class LinPath implements Path, LogicalDataPath { static public final List SUPPORTED_CHECKSUM_ALGORITHMS = ["nextflow"] static public final String SEPARATOR = '/' @@ -515,41 +514,5 @@ class LinPath implements Path, LogicalDataPath, QueryablePath { return "$filePath${query ? '?' + query : ''}${fragment ? '#' + fragment : ''}".toString() } - @Override - boolean hasQuery() { - //Lin path is a query when is root (no filepath or /) and has the query field - return (filePath.isEmpty() || filePath == SEPARATOR) && query && fileSystem - } - - @Override - List resolveQuery() { - final store = fileSystem.getStore() - if( !store ) - throw new Exception("Lineage store not found - Check Nextflow configuration") - final results = store.search(query) - return parseResults(results) - } - - private List parseResults(Map results) { - if( !results ) - throw new FileNotFoundException("No files found for ${this.toUriString()}") - final List parsedResults = [] - for( def res : results ) { - parsedResults << parseResult(res.key, res.value) - } - return parsedResults - } - - private Path parseResult(String key, LinSerializable object) { - if( fragment ) - return getSubObjectAsPath(fileSystem, key, object, parseChildrenFromFragment(fragment)) - - if( object instanceof FileOutput ) { - return new LinPath(fileSystem, key) - } else { - return generateLinMetadataPath(fileSystem, key, object, null) - } - } - } diff --git a/modules/nf-lineage/src/resources/META-INF/extensions.idx b/modules/nf-lineage/src/resources/META-INF/extensions.idx index d4809ad1ad..85b08c4461 100644 --- a/modules/nf-lineage/src/resources/META-INF/extensions.idx +++ b/modules/nf-lineage/src/resources/META-INF/extensions.idx @@ -17,4 +17,4 @@ nextflow.lineage.DefaultLinStoreFactory nextflow.lineage.LinObserverFactory nextflow.lineage.cli.LinCommandImpl -nextflow.lineage.LinChanneExImpl \ No newline at end of file +nextflow.lineage.LinChannelExImpl \ No newline at end of file diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy index db135923d4..16c7f02606 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy @@ -127,7 +127,7 @@ class DefaultLinStoreTest extends Specification { lidStore.save(key4, value4) when: - def results = lidStore.search("type=FileOutput&annotations.key=key2&annotations.value=value2") + def results = lidStore.search("type":"FileOutput", "annotations.key":"key2", "annotations.value":"value2") then: results.size() == 2 results.keySet().containsAll([key2,key3]) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy similarity index 79% rename from modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy rename to modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy index 75b527b4e1..29a01007c6 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinChanneExImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy @@ -39,12 +39,14 @@ import spock.lang.TempDir import java.time.ZoneOffset +import static nextflow.lineage.fs.LinPath.* + /** * Lineage channel extensions tests * * @author Jorge Ejarque */ -class LinChanneExImplTest extends Specification { +class LinChannelExImplTest extends Specification { @TempDir Path tempDir @@ -75,26 +77,28 @@ class LinChanneExImplTest extends Specification { lidStore.open(LineageConfig.create(session)) lidStore.save(key, value1) lidStore.save("$key#output", wfOutputs) - def channelLinExt = Spy(new LinChanneExImpl()) + def channelLinExt = Spy(new LinChannelExImpl()) when: - def results = CH.create() - channelLinExt.viewLineage(session, results, new URI('lid://testKey#params')) + def results = channelLinExt.viewLineage(session, 'lid://testKey') then: channelLinExt.getStore(session) >> lidStore and: - results.val == LinUtils.encodeSearchOutputs(params[0]) - results.val == LinUtils.encodeSearchOutputs(params[1]) - results.val == Channel.STOP + results == LinUtils.encodeSearchOutputs(value1) when: - results = CH.create() - channelLinExt.viewLineage(session, results, new URI('lid://testKey#output')) + results = channelLinExt.viewLineage(session, 'lid://testKey#params') then: channelLinExt.getStore(session) >> lidStore and: - results.val == LinUtils.encodeSearchOutputs(outputs[0]) - results.val == Channel.STOP + results == LinUtils.encodeSearchOutputs(params) + + when: + results = channelLinExt.viewLineage(session, 'lid://testKey#output') + then: + channelLinExt.getStore(session) >> lidStore + and: + results == LinUtils.encodeSearchOutputs(outputs) } def 'should return global query results' () { @@ -120,25 +124,15 @@ class LinChanneExImplTest extends Specification { lidStore.save(key2, value2) lidStore.save(key3, value3) lidStore.save(key4, value4) - def channelLinExt = Spy(new LinChanneExImpl()) + def channelLinExt = Spy(new LinChannelExImpl()) when: def results = CH.create() - channelLinExt.viewLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2")) - then: - channelLinExt.getStore(session) >> lidStore - and: - results.val == LinUtils.encodeSearchOutputs(value2) - results.val == LinUtils.encodeSearchOutputs(value3) - results.val == Channel.STOP - - when: - results = CH.create() - channelLinExt.viewLineage(session, results, new URI("cid:///?type=FileOutput&annotations.key=key2&annotations.value=value2#path")) + channelLinExt.queryLineage(session, results, [ "type":"FileOutput", "annotations.key":"key2", "annotations.value":"value2" ]) then: channelLinExt.getStore(session) >> lidStore and: - results.val == '"/path/tp/file1"' - results.val == '"/path/tp/file2"' + results.val == asUriString(key2) + results.val == asUriString(key3) results.val == Channel.STOP } } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy index ce17b89177..42c5276ff1 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy @@ -22,6 +22,7 @@ import nextflow.dag.MermaidHtmlRenderer import nextflow.lineage.LinHistoryRecord import nextflow.lineage.LinStoreFactory import nextflow.lineage.DefaultLinHistoryLog +import nextflow.lineage.model.Annotation import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput import nextflow.lineage.model.DataPath @@ -439,18 +440,23 @@ class LinCommandImplTest extends Specification{ Files.createDirectories(lidFile.parent) def lidFile2 = storeLocation.resolve(".meta/123987/file2.bam/.data.json") Files.createDirectories(lidFile2.parent) + def lidFile3 = storeLocation.resolve(".meta/123987/file3.bam/.data.json") + Files.createDirectories(lidFile3.parent) def encoder = new LinEncoder().withPrettyPrint(true) def time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(123456789), ZoneOffset.UTC) def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"), - "lid://123987/file.bam", "lid://123987/", null, 1234, time, time, null) + "lid://123987/file.bam", "lid://123987/", null, 1234, time, time, [new Annotation("experiment", "test")]) def entry2 = new FileOutput("path/to/file2",new Checksum("42472qet","nextflow","standard"), + "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, [new Annotation("experiment", "test")]) + def entry3 = new FileOutput("path/to/file3",new Checksum("42472qet","nextflow","standard"), "lid://123987/file2.bam", "lid://123987/", null, 1235, time, time, null) def expectedOutput1 = '[\n "lid://123987/file.bam",\n "lid://123987/file2.bam"\n]' def expectedOutput2 = '[\n "lid://123987/file2.bam",\n "lid://123987/file.bam"\n]' lidFile.text = encoder.encode(entry) lidFile2.text = encoder.encode(entry2) + lidFile3.text = encoder.encode(entry3) when: - new LinCommandImpl().find(configMap, ["type=FileOutput"]) + new LinCommandImpl().find(configMap, ["type=FileOutput", "annotations.value=test"]) def stdout = capture .toString() .readLines()// remove the log part From 3e377283748279c618df84a92979a80309a17bfd Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 29 Apr 2025 11:54:45 +0200 Subject: [PATCH 6/7] Correct lineage function comment Signed-off-by: jorgee --- .../src/main/groovy/nextflow/extension/OperatorImpl.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 7ab23ecd34..bd95568e18 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -1248,9 +1248,9 @@ class OperatorImpl { } /** - * Transform the items emitted by a channel by applying a function to each of them + * Transform the Lineage ID items emitted in the source channel by its lineage metadata description * - * @param channel + * @param channel Source channel with emitted lineage IDs * @return */ DataflowWriteChannel lineage(final DataflowReadChannel source) { From 6abae3c98e2e46c9feb11ae4441f705d70d3dc5b Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 29 Apr 2025 16:12:06 +0200 Subject: [PATCH 7/7] Convert lineage from operator to function and add documentation Signed-off-by: jorgee --- docs/reference/channel.md | 34 +++++++++++++++++++ docs/reference/cli.md | 4 +-- docs/reference/stdlib.md | 5 +++ .../src/main/groovy/nextflow/Nextflow.groovy | 9 +++++ .../nextflow/extension/LinChannelEx.groovy | 2 +- .../nextflow/extension/OperatorImpl.groovy | 18 ---------- .../nextflow/lineage/LinChannelExImpl.groovy | 6 +--- .../lineage/LinChannelExImplTest.groovy | 12 ++----- 8 files changed, 55 insertions(+), 35 deletions(-) diff --git a/docs/reference/channel.md b/docs/reference/channel.md index 5d3b54c95f..d76ff7df1e 100644 --- a/docs/reference/channel.md +++ b/docs/reference/channel.md @@ -405,6 +405,40 @@ Y See also: [channel.fromList](#fromlist) factory method. +(channel-query-lineage)= + +## queryLineage + +:::{versionadded} 25.04.0 +::: + +:::{warning} *Experimental: may change in a future release.* +::: + +The `channel.queryLineage` method allows you to create a channel that emits the IDs of the lineage metadata objects matching with a set of key-value parameters passed as arguments of the method. + +The following snippet shows how to create a channel (`ch`) using this method. It searches for `FileOutputs` annotated with the value 'test'. +The result is a set of Lineage IDs (lid) that can be consumed by processes as `path` or inspected with the `lineage` function. + +```nextflow + process foo { + input: + path('output_file') + + // ... + } + + workflow { + ch = channel + .queryLineage('type': 'FileOutput', 'annotations.value': 'test') + + foo(ch) + + ch.map { lid -> lineage(lid) } + } + +``` + (channel-topic)= ## topic diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 67cc43d104..0f8dd965fa 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -732,10 +732,10 @@ View a metadata description fragment. A fragment can be a property of a metadata $ nextflow lineage view ``` -Find a specific metadata description that matches a URL-like query string. The query string consists of `key=value` statements separated by `&`, where keys are defined similarly to the `fragments` used in the `view` command. +Find a specific metadata description that matches to a set of key-value parameters. Keys are defined similarly to the `fragments` used in the `view` command. ```console -$ nextflow lineage find "" +$ nextflow lineage find = = ... ``` Display a git-style diff between two metadata descriptions. diff --git a/docs/reference/stdlib.md b/docs/reference/stdlib.md index d00e1091d2..99b7b19c3d 100644 --- a/docs/reference/stdlib.md +++ b/docs/reference/stdlib.md @@ -235,6 +235,11 @@ The following functions are available in Nextflow scripts: `groupKey( key, size: int ) -> GroupKey` : Create a grouping key to use with the {ref}`operator-grouptuple` operator. +`lineage( lid ) -> LinSerializable` +: :::{versionadded} 25.04.0 +: ::: +: Get the Lineage metadata object + `multiMapCriteria( criteria: Closure ) -> Closure` : Create a multi-map criteria to use with the {ref}`operator-multiMap` operator. diff --git a/modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy b/modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy index aee6368755..52dd250ba8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy @@ -29,10 +29,12 @@ import nextflow.ast.OpXformImpl import nextflow.exception.StopSplitIterationException import nextflow.exception.WorkflowScriptErrorException import nextflow.extension.GroupKey +import nextflow.extension.LinChannelEx import nextflow.extension.OperatorImpl import nextflow.file.FileHelper import nextflow.file.FilePatternSplitter import nextflow.mail.Mailer +import nextflow.plugin.Plugins import nextflow.script.TokenBranchDef import nextflow.script.TokenMultiMapDef import nextflow.splitter.FastaSplitter @@ -422,4 +424,11 @@ class Nextflow { */ static Closure multiMapCriteria(Closure closure) { closure } + static Object lineage( String lid ) { + final operation = Plugins.getExtension(LinChannelEx) + if( !operation ) + throw new IllegalStateException("Unable to load lineage extensions.") + return operation.viewLineage(session, lid) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy index 7b0978ec8e..195ac870c4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinChannelEx.groovy @@ -13,7 +13,7 @@ interface LinChannelEx { * * @param session Nextflow Session * @param lid Lineage Id to view - * @return Lineage metadata content + * @return Lineage metadata object */ Object viewLineage(Session session, String lid) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index bd95568e18..e13b79d017 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -1246,22 +1246,4 @@ class OperatorImpl { .apply() .getOutput() } - - /** - * Transform the Lineage ID items emitted in the source channel by its lineage metadata description - * - * @param channel Source channel with emitted lineage IDs - * @return - */ - DataflowWriteChannel lineage(final DataflowReadChannel source) { - assert source != null - final operation = Plugins.getExtension(LinChannelEx) - if( !operation ) - throw new IllegalStateException("Unable to load lineage extensions.") - final closure = { operation.viewLineage(session, it) } - return new MapOp(source, closure).apply() - } - - - } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy index 508ef6ce88..ae41c5d979 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinChannelExImpl.groovy @@ -37,11 +37,7 @@ class LinChannelExImpl implements LinChannelEx { Object viewLineage(Session session, String lid) { final store = getStore(session) - final results = LinUtils.query(store, new URI(lid)) - if( !results ) { - throw new FileNotFoundException("No entry found for $lid") - } - return LinUtils.encodeSearchOutputs(results.size() == 1 ? results[0] : results) + return store.load(LinPathFactory.create(lid).toString()) } void queryLineage(Session session, DataflowWriteChannel channel, Map params) { diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy index 29a01007c6..acaef23fbf 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinChannelExImplTest.groovy @@ -59,7 +59,7 @@ class LinChannelExImplTest extends Specification { configMap = [linage: [enabled: true, store: [location: storeLocation.toString()]]] } - def 'should get metadata fragment'() { + def 'should get metadata'() { given: def uniqueId = UUID.randomUUID() @@ -84,21 +84,15 @@ class LinChannelExImplTest extends Specification { then: channelLinExt.getStore(session) >> lidStore and: - results == LinUtils.encodeSearchOutputs(value1) + results == value1 - when: - results = channelLinExt.viewLineage(session, 'lid://testKey#params') - then: - channelLinExt.getStore(session) >> lidStore - and: - results == LinUtils.encodeSearchOutputs(params) when: results = channelLinExt.viewLineage(session, 'lid://testKey#output') then: channelLinExt.getStore(session) >> lidStore and: - results == LinUtils.encodeSearchOutputs(outputs) + results == wfOutputs } def 'should return global query results' () {