Skip to content

Commit cf29db6

Browse files
committed
Minor improvements
Signed-off-by: Ben Sherman <[email protected]>
1 parent bc6e5ce commit cf29db6

File tree

4 files changed

+106
-102
lines changed

4 files changed

+106
-102
lines changed

README.md

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,9 @@ Nextflow plugin for tracking provenance of pipeline output files.
44

55
## Getting Started
66

7-
First, `nf-prov` requires Nextflow version `22.11.0-edge` (or later). You can achieve this by running the following snippet before your `nextflow run` command. In Nextflow Tower, you can include this in your pre-run script under "Advanced Options" on the "Launch Pipeline" page.
7+
The `nf-prov` plugin requires Nextflow version `22.11.0-edge` or later.
88

9-
```sh
10-
export NXF_VER=22.11.0-edge
11-
```
12-
13-
Second, to enable and configure `nf-prov`, you must include the following snippet to your Nextflow config and update as needed. Note that the use of `params.outdir` assumes that you are using an nf-core pipeline with that parameter.
9+
To enable and configure `nf-prov`, include the following snippet to your Nextflow config and update as needed.
1410

1511
```groovy
1612
plugins {
@@ -26,6 +22,26 @@ prov {
2622

2723
Finally, run your Nextflow pipeline. You do not need to modify your pipeline script in order to use the `nf-prov` plugin. The plugin will automatically generate a JSON file with provenance information.
2824

25+
## Configuration
26+
27+
The following options are available:
28+
29+
`prov.enabled`
30+
31+
Create the provenance manifest (default: `true` if plugin is loaded).
32+
33+
`prov.file`
34+
35+
The path of the provenance manifest (default: `manifest.json`).
36+
37+
`prov.patterns`
38+
39+
List of file patterns to include in the provenance manifest, from the set of published files. By default, all published files are included.
40+
41+
`prov.overwrite`
42+
43+
Overwrite any existing provenance report with the same name (default: `false`).
44+
2945
## Development
3046

3147
Run the following commands to build and test the nf-prov Nextflow plugin. Refer to the [nf-hello](https://github.com/nextflow-io/nf-hello) README for additional instructions (_e.g._ for publishing the plugin).

plugins/nf-prov/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ sourceSets {
5656

5757
dependencies {
5858
// This dependency is exported to consumers, that is to say found on their compile classpath.
59-
compileOnly 'io.nextflow:nextflow:22.04.0'
59+
compileOnly 'io.nextflow:nextflow:23.04.0'
6060
compileOnly 'org.slf4j:slf4j-api:1.7.10'
6161
compileOnly 'org.pf4j:pf4j:3.4.1'
6262
// add here plugins depepencies
6363

6464
// test configuration
6565
testImplementation "org.codehaus.groovy:groovy:3.0.8"
6666
testImplementation "org.codehaus.groovy:groovy-nio:3.0.8"
67-
testImplementation 'io.nextflow:nextflow:22.04.0'
67+
testImplementation 'io.nextflow:nextflow:23.04.0'
6868
testImplementation ("org.codehaus.groovy:groovy-test:3.0.8") { exclude group: 'org.codehaus.groovy' }
6969
testImplementation ("cglib:cglib-nodep:3.3.0")
7070
testImplementation ("org.objenesis:objenesis:3.1")

plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy

Lines changed: 67 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package nextflow.prov
1818

1919
import java.nio.file.FileSystems
20-
import java.nio.file.Files
2120
import java.nio.file.Path
2221
import java.nio.file.PathMatcher
2322

@@ -36,98 +35,86 @@ import nextflow.exception.AbortOperationException
3635
* Plugin observer of workflow events
3736
*
3837
* @author Bruno Grande <[email protected]>
38+
* @author Ben Sherman <[email protected]>
3939
*/
4040
@Slf4j
4141
@CompileStatic
4242
class ProvObserver implements TraceObserver {
4343

4444
public static final String DEF_FILE_NAME = 'manifest.json'
4545

46-
private Session session
47-
4846
private Map config
4947

50-
private boolean enabled
51-
5248
private Path path
5349

50+
private Boolean overwrite
51+
5452
private List<PathMatcher> matchers
5553

56-
private List<Map> published
54+
private List<Map> published = []
55+
56+
private Map tasks = [:]
5757

58-
private Map tasks
58+
ProvObserver(Path path, Boolean overwrite, List patterns) {
59+
this.path = path
60+
this.overwrite = overwrite
61+
this.matchers = patterns.collect { pattern ->
62+
FileSystems.getDefault().getPathMatcher("glob:**/${pattern}")
63+
}
64+
}
5965

6066
@Override
6167
void onFlowCreate(Session session) {
62-
this.session = session
6368
this.config = session.config
64-
this.enabled = this.config.navigate('prov.enabled', true)
65-
this.config.overwrite = this.config.navigate('prov.overwrite', false)
66-
this.config.patterns = this.config.navigate('prov.patterns', [])
67-
this.config.file = this.config.navigate('prov.file', DEF_FILE_NAME)
68-
this.path = (this.config.file as Path).complete()
6969

7070
// check file existance
71-
final attrs = FileHelper.readAttributes(this.path)
72-
if( this.enabled && attrs ) {
73-
if( this.config.overwrite && (attrs.isDirectory() || !this.path.delete()) )
74-
throw new AbortOperationException("Unable to overwrite existing file manifest: ${this.path.toUriString()}")
75-
else if( !this.config.overwrite )
76-
throw new AbortOperationException("File manifest already exists: ${this.path.toUriString()}")
71+
final attrs = FileHelper.readAttributes(path)
72+
if( attrs ) {
73+
if( overwrite && (attrs.isDirectory() || !path.delete()) )
74+
throw new AbortOperationException("Unable to overwrite existing provenance manifest: ${path.toUriString()}")
75+
else if( !overwrite )
76+
throw new AbortOperationException("Provenance manifest already exists: ${path.toUriString()}")
7777
}
78-
79-
this.matchers = this.config.patterns.collect { pattern ->
80-
FileSystems.getDefault().getPathMatcher("glob:**/${pattern}")
81-
}
82-
83-
this.published = []
84-
this.tasks = [:]
8578
}
8679

8780
static def jsonify(root) {
88-
if ( root instanceof LinkedHashMap ) {
89-
root.eachWithIndex { key, value, index ->
90-
root[key] = jsonify(value)
91-
}
92-
} else if ( root instanceof Collection ) {
93-
root = new ArrayList(root);
94-
root.eachWithIndex { item, index ->
95-
root[index] = jsonify(item)
96-
}
97-
} else if ( root instanceof FileHolder ) {
98-
root = root.getStorePath()
99-
root = jsonify(root)
100-
} else if ( root instanceof Path ) {
101-
root = root.toUriString()
102-
} else if ( root instanceof Boolean ||
103-
root instanceof Number ) {
104-
return root
105-
} else {
106-
return root as String
107-
}
108-
return root
81+
if ( root instanceof Map )
82+
root.collectEntries( (k, v) -> [k, jsonify(v)] )
83+
84+
else if ( root instanceof Collection )
85+
root.collect( v -> jsonify(v) )
86+
87+
else if ( root instanceof FileHolder )
88+
jsonify(root.storePath)
89+
90+
else if ( root instanceof Path )
91+
root.toUriString()
92+
93+
else if ( root instanceof Boolean || root instanceof Number )
94+
root
95+
96+
else
97+
root.toString()
10998
}
11099

111-
void trackProcess(TaskHandler handler, TraceRecord trace){
112-
def taskRun = handler.getTask()
113-
def taskConfig = taskRun.config
114-
def taskId = taskRun.id as String
100+
void trackProcess(TaskHandler handler, TraceRecord trace) {
101+
def task = handler.task
115102

116103
// TODO: Figure out what the '$' input/output means
117104
// Omitting them from manifest for now
118105
def taskMap = [
119-
'id': taskId,
120-
'name': taskRun.getName(),
121-
'cached': taskRun.cached,
106+
'id': task.id as String,
107+
'name': task.name,
108+
'cached': task.cached,
122109
'process': trace.getProcessName(),
123-
'inputs': taskRun.inputs.findResults { inParam, object ->
110+
'inputs': task.inputs.findResults { inParam, object ->
124111
def inputMap = [
125112
'name': inParam.getName(),
126113
'value': jsonify(object)
127114
]
128115
inputMap['name'] == '$' ? null : inputMap
129116
},
130-
'outputs': taskRun.outputs.findResults { outParam, object ->
117+
'outputs': task.outputs.findResults { outParam, object ->
131118
def outputMap = [
132119
'name': outParam.getName(),
133120
'emit': outParam.getChannelEmitName(),
@@ -137,75 +124,64 @@ class ProvObserver implements TraceObserver {
137124
}
138125
]
139126

140-
this.tasks.put(taskId, taskMap)
127+
tasks.put(task.id, taskMap)
141128
}
142129

143130
@Override
144-
void onProcessComplete(TaskHandler handler, TraceRecord trace){
131+
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
145132
trackProcess(handler, trace)
146133
}
147134

148135
@Override
149-
void onProcessCached(TaskHandler handler, TraceRecord trace){
136+
void onProcessCached(TaskHandler handler, TraceRecord trace) {
150137
trackProcess(handler, trace)
151138
}
152139

153140
@Override
154141
void onFilePublish(Path destination, Path source) {
155-
boolean match = this.matchers.isEmpty() || this.matchers.any { matcher ->
142+
boolean match = matchers.isEmpty() || matchers.any { matcher ->
156143
matcher.matches(destination)
157144
}
158145

159-
def pathMap = [
146+
if( !match )
147+
return
148+
149+
published.add( [
160150
'source': source.toUriString(),
161151
'target': destination.toUriString()
162-
]
163-
164-
if ( match ) {
165-
this.published.add(pathMap)
166-
}
152+
] )
167153
}
168154

169155
@Override
170156
void onFlowComplete() {
171-
// make sure there are files to publish
172-
if ( !this.enabled ) {
173-
return
174-
}
175-
176157
// generate temporary output-task map
177-
def outputTaskMap = [:]
178-
this.tasks.each { taskId, task ->
158+
def taskLookup = tasks.inject([:]) { accum, hash, task ->
179159
task['outputs'].each { output ->
180160
// Make sure to handle tuples of outputs
181161
def values = output['value']
182-
if ( values instanceof Collection ) {
183-
values.each { outputTaskMap.put(it, task['id']) }
184-
} else {
185-
outputTaskMap.put(values, task['id'])
186-
}
162+
if ( values instanceof Collection )
163+
values.each { accum.put(it, task['id']) }
164+
else
165+
accum.put(values, task['id'])
187166
}
167+
accum
188168
}
189169

190170
// add task information to published files
191-
this.published.each { path ->
192-
path['publishingTaskId'] = outputTaskMap[path.source]
171+
published.each { path ->
172+
path['publishingTaskId'] = taskLookup[path.source]
193173
}
194174

195-
// generate manifest map
175+
// save manifest to JSON file
196176
def manifest = [
197-
'pipeline': this.config.manifest,
198-
'published': this.published,
199-
'tasks': this.tasks
177+
'pipeline': config.manifest,
178+
'published': published,
179+
'tasks': tasks
200180
]
181+
def manifestJson = JsonOutput.toJson(manifest)
182+
def manifestJsonPretty = JsonOutput.prettyPrint(manifestJson)
201183

202-
// output manifest map as JSON
203-
def manifest_json = JsonOutput.toJson(manifest)
204-
def manifest_json_pretty = JsonOutput.prettyPrint(manifest_json)
205-
206-
// create JSON file manifest
207-
Path manifestFile = Files.createFile(this.path)
208-
manifestFile << "${manifest_json_pretty}\n"
184+
path.text = manifestJsonPretty
209185

210186
}
211187

plugins/nf-prov/src/main/nextflow/prov/ProvObserverFactory.groovy

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package nextflow.prov
1818

19+
import java.nio.file.Path
20+
1921
import groovy.transform.CompileStatic
2022
import nextflow.Session
2123
import nextflow.trace.TraceObserver
@@ -31,8 +33,18 @@ class ProvObserverFactory implements TraceObserverFactory {
3133

3234
@Override
3335
Collection<TraceObserver> create(Session session) {
34-
final result = new ArrayList()
35-
result.add( new ProvObserver() )
36-
return result
36+
[ createProvObserver(session.config) ]
37+
}
38+
39+
protected TraceObserver createProvObserver(Map config) {
40+
final enabled = config.navigate('prov.enabled', true) as Boolean
41+
if( !enabled )
42+
return
43+
44+
final file = config.navigate('prov.file', ProvObserver.DEF_FILE_NAME)
45+
final path = (file as Path).complete()
46+
final overwrite = config.navigate('prov.overwrite') as Boolean
47+
final patterns = config.navigate('prov.patterns', []) as List
48+
new ProvObserver(path, format, overwrite, patterns)
3749
}
3850
}

0 commit comments

Comments
 (0)