Skip to content

Data lineage programmatic API #6003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/reference/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,40 @@ Y

See also: [channel.fromList](#fromlist) factory method.

(channel-query-lineage)=

## queryLineage

:::{versionadded} 25.04.0
:::

:::{warning} *Experimental: may change in a future release.*
:::

The `channel.queryLineage` method allows you to create a channel that emits the IDs of the lineage metadata objects matching with a set of key-value parameters passed as arguments of the method.

The following snippet shows how to create a channel (`ch`) using this method. It searches for `FileOutputs` annotated with the value 'test'.
The result is a set of Lineage IDs (lid) that can be consumed by processes as `path` or inspected with the `lineage` function.

```nextflow
process foo {
input:
path('output_file')

// ...
}

workflow {
ch = channel
.queryLineage('type': 'FileOutput', 'annotations.value': 'test')

foo(ch)

ch.map { lid -> lineage(lid) }
}

```

(channel-topic)=

## topic
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -732,10 +732,10 @@ View a metadata description fragment. A fragment can be a property of a metadata
$ nextflow lineage view <lid#fragment>
```

Find a specific metadata description that matches a URL-like query string. The query string consists of `key=value` statements separated by `&`, where keys are defined similarly to the `fragments` used in the `view` command.
Find a specific metadata description that matches to a set of key-value parameters. Keys are defined similarly to the `fragments` used in the `view` command.

```console
$ nextflow lineage find "<query-string>"
$ nextflow lineage find <key-1>=<value-1> <key-2>=<value-2> ...
```

Display a git-style diff between two metadata descriptions.
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/stdlib.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ The following functions are available in Nextflow scripts:
`groupKey( key, size: int ) -> GroupKey`
: Create a grouping key to use with the {ref}`operator-grouptuple` operator.

`lineage( lid ) -> LinSerializable`
: :::{versionadded} 25.04.0
: :::
: Get the Lineage metadata object

`multiMapCriteria( criteria: Closure ) -> Closure`
: Create a multi-map criteria to use with the {ref}`operator-multiMap` operator.

Expand Down
24 changes: 23 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow

import nextflow.extension.LinChannelEx
import nextflow.plugin.Plugins

import static nextflow.util.CheckHelper.*

import java.nio.file.FileSystem
Expand Down Expand Up @@ -657,4 +660,23 @@ class Channel {
fromPath0Future = future.exceptionally(Channel.&handlerException)
}

}
static DataflowWriteChannel queryLineage(Map<String,String> params) {
final result = CH.create()
if( NF.isDsl2() ) {
session.addIgniter { queryLineage0(result, params) }
}
else {
queryLineage0(result, params )
}
return result
}

private static void queryLineage0(DataflowWriteChannel channel, Map<String,String> params) {
final operation = Plugins.getExtension(LinChannelEx)
if( !operation )
throw new IllegalStateException("Unable to load lineage extensions.")
def future = CompletableFuture.runAsync( { operation.queryLineage(session, channel, params) } as Runnable)
future.exceptionally(this.&handlerException)
}

}
9 changes: 9 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Nextflow.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import nextflow.ast.OpXformImpl
import nextflow.exception.StopSplitIterationException
import nextflow.exception.WorkflowScriptErrorException
import nextflow.extension.GroupKey
import nextflow.extension.LinChannelEx
import nextflow.extension.OperatorImpl
import nextflow.file.FileHelper
import nextflow.file.FilePatternSplitter
import nextflow.mail.Mailer
import nextflow.plugin.Plugins
import nextflow.script.TokenBranchDef
import nextflow.script.TokenMultiMapDef
import nextflow.splitter.FastaSplitter
Expand Down Expand Up @@ -422,4 +424,11 @@ class Nextflow {
*/
static Closure<TokenMultiMapDef> multiMapCriteria(Closure<TokenBranchDef> closure) { closure }

static Object lineage( String lid ) {
final operation = Plugins.getExtension(LinChannelEx)
if( !operation )
throw new IllegalStateException("Unable to load lineage extensions.")
return operation.viewLineage(session, lid)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package nextflow.extension

import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Session

/**
* Interface to implement the Lineage channel factories and functions.
* @author Jorge Ejarque <[email protected]
*/
interface LinChannelEx {
/**
* Lineage metadata view.
*
* @param session Nextflow Session
* @param lid Lineage Id to view
* @return Lineage metadata object
*/
Object viewLineage(Session session, String lid)

/**
* Query Lineage metadata.
*
* @param session Nextflow Session
* @param channel Channel to publish the Lineage Ids matching the query params
* @param params Query parameters
*/
void queryLineage(Session session, DataflowWriteChannel channel, Map<String,String> params)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import nextflow.plugin.Plugins
import nextflow.script.ChannelOut
import nextflow.script.TokenBranchDef
import nextflow.script.TokenMultiMapDef
Expand Down Expand Up @@ -1245,5 +1246,4 @@ class OperatorImpl {
.apply()
.getOutput()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,7 @@ class DefaultLinStore implements LinStore {
void close() throws IOException { }

@Override
Map<String, LinSerializable> search(String queryString) {
def params = null
if (queryString) {
params = LinUtils.parseQuery(queryString)
}
return searchAllFiles(params)
}

private Map<String, LinSerializable> searchAllFiles(Map<String,String> params) {
Map<String, LinSerializable> search(Map<String,String> params) {
final results = new HashMap<String, LinSerializable>()

Files.walkFileTree(location, new FileVisitor<Path>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2013-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.lineage

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.extension.LinChannelEx
import nextflow.lineage.fs.LinPath
import nextflow.lineage.fs.LinPathFactory
import nextflow.lineage.serde.LinSerializable

/**
* Lineage channel extensions
*
* @author Jorge Ejarque <[email protected]>
*/
@CompileStatic
@Slf4j
class LinChannelExImpl implements LinChannelEx {

Object viewLineage(Session session, String lid) {
final store = getStore(session)
return store.load(LinPathFactory.create(lid).toString())
}

void queryLineage(Session session, DataflowWriteChannel channel, Map<String, String> params) {
final store = getStore(session)
emitSearchResults(channel, store.search(params))
channel.bind(Channel.STOP)
}

protected LinStore getStore(Session session) {
final store = LinStoreFactory.getOrCreate(session)
if( !store ) {
throw new Exception("Lineage store not found - Check Nextflow configuration")
}
return store
}

private void emitSearchResults(DataflowWriteChannel channel, Map<String, LinSerializable> results) {
if( !results ) {
return
}
results.keySet().forEach { channel.bind(LinPath.LID_PROT + it) }
}
}
6 changes: 3 additions & 3 deletions modules/nf-lineage/src/main/nextflow/lineage/LinStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ interface LinStore extends Closeable {

/**
* Search for lineage entries.
* @queryString Json-path like query string. (Only simple and nested field operators are supported(No array, wildcards,etc.)
* @return Key-lineage entry pairs fulfilling the queryString
* @params Map of query params
* @return Key-lineage entry pairs fulfilling the query params
*/
Map<String,LinSerializable> search(String queryString)
Map<String,LinSerializable> search(Map<String,String> params)

}
4 changes: 2 additions & 2 deletions modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class LinUtils {
}

private static Collection<LinSerializable> globalSearch(LinStore store, URI uri) {
final results = store.search(uri.query).values()
final results = store.search(parseQuery(uri.query)).values()
if (results && uri.fragment) {
// If fragment is defined get the property of the object indicated by the fragment
return filterResults(results, uri.fragment)
Expand Down Expand Up @@ -316,7 +316,7 @@ class LinUtils {
* @param output Output to encode
* @return Output encoded as a JSON string
*/
static String encodeSearchOutputs(Object output, boolean prettyPrint) {
static String encodeSearchOutputs(Object output, boolean prettyPrint = false) {
if (output instanceof LinSerializable) {
return new LinEncoder().withPrettyPrint(prettyPrint).encode(output)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nextflow.lineage.cli

import static nextflow.lineage.fs.LinPath.*
import static nextflow.lineage.LinUtils.*

import java.nio.charset.StandardCharsets
import java.nio.file.Path
Expand Down Expand Up @@ -319,7 +320,10 @@ class LinCommandImpl implements CmdLineage.LinCommand {
return
}
try {
println LinUtils.encodeSearchOutputs(store.search(args[0]).keySet().collect {asUriString(it)}, true)
final params = args.collectEntries {
it.split('=').collect { URLDecoder.decode(it, 'UTF-8') }
} as Map<String, String>
println LinUtils.encodeSearchOutputs( store.search(params).keySet().collect { asUriString(it) }, true )
} catch (Throwable e){
println "Error searching for ${args[0]}. ${e.message}"
}
Expand Down
Loading