Skip to content

Add sqlExecute #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a299522
build: Add version extraction and local installation target to Makefile
edmundmiller Apr 10, 2025
c2250bc
fix: Add executeUpdate option to QueryHandler for DDL and UPDATE stat…
edmundmiller Apr 10, 2025
7bff473
refactor: Remove executeUpdate option from QueryHandler and simplify …
edmundmiller Apr 30, 2025
f11b644
feat: Add SQL execution methods to ChannelSqlExtension
edmundmiller Apr 30, 2025
e95bef4
docs: Enhance README and add examples for SQL execution functions
edmundmiller Apr 30, 2025
79f45d8
test: Add comprehensive SQL execution tests for ChannelSqlExtension
edmundmiller Apr 30, 2025
1e5991b
refactor: Update ChannelSqlExtension methods to instance methods
edmundmiller Apr 30, 2025
64c41e0
refactor: Update SQL execution example to use file-based H2 database
edmundmiller Apr 30, 2025
52a0e04
fix: Enhance error handling for database connection and commit operat…
edmundmiller May 1, 2025
a5f1073
feat: Add integration testing framework for database backends
edmundmiller May 2, 2025
ac469c6
refactor: Rename execute function to sqlExecute and update documentation
edmundmiller May 2, 2025
543f9ee
refactor: Rewrite execute functions into on one function
edmundmiller May 3, 2025
527199b
fix: Update sqlExecute method to return structured result maps
edmundmiller May 3, 2025
1d8072c
chore: Restructure test directory names
edmundmiller May 3, 2025
36418f2
docs: Update sqlExecute documentation on outputs
edmundmiller May 7, 2025
f4937c8
refactor: Clean up integration test structure
edmundmiller May 7, 2025
1e5658e
fix: Improve error handling and logging for unsupported database oper…
edmundmiller May 27, 2025
5365357
refactor: Streamline error handling for database operations in Insert…
edmundmiller May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
config ?= compileClasspath
version ?= $(shell grep 'Plugin-Version' plugins/nf-sqldb/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }')

ifdef module
mm = :${module}:
Expand Down Expand Up @@ -69,3 +70,9 @@ upload-plugins:

publish-index:
./gradlew plugins:publishIndex

# Install the plugin into local nextflow plugins dir
install:
./gradlew copyPluginZip
rm -rf ${HOME}/.nextflow/plugins/nf-sqldb-${version}
cp -r build/plugins/nf-sqldb-${version} ${HOME}/.nextflow/plugins/nf-sqldb-${version}
108 changes: 93 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ This plugin provides support for interacting with SQL databases in Nextflow scri

The following databases are currently supported:

* [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
* [DuckDB](https://duckdb.org/)
* [H2](https://www.h2database.com)
* [MySQL](https://www.mysql.com/)
* [MariaDB](https://mariadb.org/)
* [PostgreSQL](https://www.postgresql.org/)
* [SQLite](https://www.sqlite.org/index.html)
- [AWS Athena](https://aws.amazon.com/athena/) (Setup guide [here](docs/aws-athena.md))
- [DuckDB](https://duckdb.org/)
- [H2](https://www.h2database.com)
- [MySQL](https://www.mysql.com/)
- [MariaDB](https://mariadb.org/)
- [PostgreSQL](https://www.postgresql.org/)
- [SQLite](https://www.sqlite.org/index.html)

NOTE: THIS IS A PREVIEW TECHNOLOGY, FEATURES AND CONFIGURATION SETTINGS CAN CHANGE IN FUTURE RELEASES.

Expand All @@ -24,7 +24,6 @@ plugins {
}
```


## Configuration

You can configure any number of databases under the `sql.db` configuration scope. For example:
Expand Down Expand Up @@ -79,7 +78,7 @@ The following options are available:

`batchSize`
: Query the data in batches of the given size. This option is recommended for queries that may return large a large result set, so that the entire result set is not loaded into memory at once.
: *NOTE:* this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.
: _NOTE:_ this feature requires that the underlying SQL database supports `LIMIT` and `OFFSET`.

`emitColumns`
: When `true`, the column names in the `SELECT` statement are emitted as the first tuple in the resulting channel.
Expand All @@ -104,7 +103,7 @@ INSERT INTO SAMPLE (NAME, LEN) VALUES ('HELLO', 5);
INSERT INTO SAMPLE (NAME, LEN) VALUES ('WORLD!', 6);
```

*NOTE:* the target table (e.g. `SAMPLE` in the above example) must be created beforehand.
_NOTE:_ the target table (e.g. `SAMPLE` in the above example) must be created beforehand.

The following options are available:

Expand All @@ -125,7 +124,87 @@ The following options are available:

`setup`
: A SQL statement that is executed before inserting the data, e.g. to create the target table.
: *NOTE:* the underlying database should support the *create table if not exist* idiom, as the plugin will execute this statement every time the script is run.
: _NOTE:_ the underlying database should support the _create table if not exist_ idiom, as the plugin will execute this statement every time the script is run.

## SQL Execution Functions

This plugin provides the following function for executing SQL statements that don't return data, such as DDL (Data Definition Language) and DML (Data Manipulation Language) operations.

### sqlExecute

The `sqlExecute` function executes a SQL statement that doesn't return a result set, such as `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, or `DELETE` statements. For DML statements (`INSERT`, `UPDATE`, `DELETE`), it returns a Map with `success: true` and `result` set to the number of rows affected. For DDL statements (`CREATE`, `ALTER`, `DROP`), it returns a Map with `success: true` and `result: null`.

For example:

```nextflow
include { sqlExecute } from 'plugin/nf-sqldb'

// Create a table (returns Map with result: null for DDL operations)
def createResult = sqlExecute(
db: 'foo',
statement: '''
CREATE TABLE IF NOT EXISTS sample_table (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
value DOUBLE
)
'''
)
println "Create result: $createResult" // [success: true, result: null]

// Insert data (returns Map with result: 1 for number of rows affected)
def insertedRows = sqlExecute(
db: 'foo',
statement: "INSERT INTO sample_table (id, name, value) VALUES (1, 'alpha', 10.5)"
)
println "Inserted $insertedRows.row(s)" // [success: true, result: 1]

// Update data (returns Map with result: number of rows updated)
def updatedRows = sqlExecute(
db: 'foo',
statement: "UPDATE sample_table SET value = 30.5 WHERE name = 'alpha'"
)
println "Updated $updatedRows.row(s)" // [success: true, result: <number>]

// Delete data (returns Map with result: number of rows deleted)
def deletedRows = sqlExecute(
db: 'foo',
statement: "DELETE FROM sample_table WHERE value > 25"
)
println "Deleted $deletedRows.row(s)" // [success: true, result: <number>]
```

The following options are available:

`db`
: The database handle. It must be defined under `sql.db` in the Nextflow configuration.

`statement`
: The SQL statement to execute. This can be any DDL or DML statement that doesn't return a result set.

## Differences Between Dataflow Operators and Execution Function

The plugin provides two different ways to interact with databases:

1. **Dataflow Operators** (`fromQuery` and `sqlInsert`): These are designed to integrate with Nextflow's dataflow programming model, operating on channels.

- `fromQuery`: Queries data from a database and returns a channel that emits the results.
- `sqlInsert`: Takes data from a channel and inserts it into a database.

2. **Execution Function** (`sqlExecute`): This is designed for direct SQL statement execution that doesn't require channel integration.
- `sqlExecute`: Executes a SQL statement. For DML operations, it returns the count of affected rows. For DDL operations, it returns null.

Use **Dataflow Operators** when you need to:

- Query data that will flow into your pipeline processing
- Insert data from your pipeline processing into a database

Use **Execution Function** when you need to:

- Perform database setup (creating tables, schemas, etc.)
- Execute administrative commands
- Perform one-off operations (deleting all records, truncating a table, etc.)
- Execute statements where you don't need the results as part of your dataflow

## Querying CSV files

Expand Down Expand Up @@ -159,12 +238,11 @@ The `CSVREAD` function provided by the H2 database engine allows you to query an

Like all dataflow operators in Nextflow, the operators provided by this plugin are executed asynchronously.

In particular, data inserted using the `sqlInsert` operator is *not* guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.

In particular, data inserted using the `sqlInsert` operator is _not_ guaranteed to be available to any subsequent queries using the `fromQuery` operator, as it is not possible to make a channel factory operation dependent on some upstream operation.

## Developtment
## Development

#### Publish artifacts to Maven repo
### Publish artifacts to Maven repo

Use the following command:

Expand Down
76 changes: 76 additions & 0 deletions examples/sql-execution/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env nextflow

/*
* Example script demonstrating how to use the SQL sqlExecute function
*/

include { sqlExecute } from 'plugin/nf-sqldb'
include { fromQuery } from 'plugin/nf-sqldb'

// Define database configuration in nextflow.config file
// sql.db.demo = [url: 'jdbc:h2:mem:demo', driver: 'org.h2.Driver']

workflow {
log.info """
=========================================
SQL Execution Function Example
=========================================
"""

// Step 1: Create a table (DDL operation returns null)
log.info "Creating a sample table..."
def createResult = sqlExecute(
db: 'demo',
statement: '''
CREATE TABLE IF NOT EXISTS TEST_TABLE (
ID INTEGER PRIMARY KEY,
NAME VARCHAR(100),
VALUE DOUBLE
)
'''
)
log.info "Create table result: $createResult"

// Step 2: Insert some data (DML operation returns affected row count)
log.info "Inserting data..."
def insertCount = sqlExecute(
db: 'demo',
statement: '''
INSERT INTO TEST_TABLE (ID, NAME, VALUE) VALUES
(1, 'alpha', 10.5),
(2, 'beta', 20.7),
(3, 'gamma', 30.2),
(4, 'delta', 40.9);
'''
)
log.info "Inserted $insertCount rows"

// Step 3: Update some data (DML operation returns affected row count)
log.info "Updating data..."
def updateCount = sqlExecute(
db: 'demo',
statement: '''
UPDATE TEST_TABLE
SET VALUE = VALUE * 2
WHERE ID = 2;
'''
)
log.info "Updated $updateCount rows"

// Step 4: Delete some data (DML operation returns affected row count)
log.info "Deleting data..."
def deleteCount = sqlExecute(
db: 'demo',
statement: '''
DELETE FROM TEST_TABLE
WHERE ID = 4;
'''
)
log.info "Deleted $deleteCount rows"

// Step 5: Query results
log.info "Querying results..."
channel
.fromQuery("SELECT * FROM TEST_TABLE ORDER BY ID", db: 'demo')
.view { row -> "ID: ${row[0]}, Name: ${row[1]}, Value: ${row[2]}" }
}
22 changes: 22 additions & 0 deletions examples/sql-execution/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Configuration file for the SQL execution example script
*/

// Enable the SQL DB plugin
plugins {
id '[email protected]'
}

// Define a file-based H2 database for the example
sql {
db {
demo {
url = 'jdbc:h2:./demo'
driver = 'org.h2.Driver'
}
}
}

// Silence unnecessary Nextflow logs
process.echo = false
dag.enabled = false
76 changes: 76 additions & 0 deletions plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@ import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.expression.DataflowExpression
import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.plugin.extension.Factory
import nextflow.plugin.extension.Function
import nextflow.plugin.extension.Operator
import nextflow.plugin.extension.PluginExtensionPoint
import nextflow.sql.config.SqlConfig
import nextflow.sql.config.SqlDataSource
import nextflow.util.CheckHelper
import java.sql.Connection
import java.sql.Statement
import groovy.sql.Sql
/**
* Provide a channel factory extension that allows the execution of Sql queries
*
Expand Down Expand Up @@ -133,4 +138,75 @@ class ChannelSqlExtension extends PluginExtensionPoint {
return target
}

private static final Map EXECUTE_PARAMS = [
db: CharSequence,
statement: CharSequence
]

/**
* Execute a SQL statement that does not return a result set (DDL/DML statements)
* For DML statements (INSERT, UPDATE, DELETE), it returns a result map with success status and number of affected rows
* For DDL statements (CREATE, ALTER, DROP), it returns a result map with success status
*
* @param params A map containing 'db' (database alias) and 'statement' (SQL string to execute)
* @return A map containing 'success' (boolean), 'result' (rows affected or null) and optionally 'error' (message)
*/
@Function
Map sqlExecute(Map params) {
CheckHelper.checkParams('sqlExecute', params, EXECUTE_PARAMS)

final String dbName = params.db as String ?: 'default'
final String statement = params.statement as String

if (!statement)
return [success: false, error: "Missing required parameter 'statement'"]

final sqlConfig = new SqlConfig((Map) session.config.navigate('sql.db'))
final SqlDataSource dataSource = sqlConfig.getDataSource(dbName)

if (dataSource == null) {
def msg = "Unknown db name: $dbName"
def choices = sqlConfig.getDataSourceNames().closest(dbName) ?: sqlConfig.getDataSourceNames()
if (choices?.size() == 1)
msg += " - Did you mean: ${choices.get(0)}?"
else if (choices)
msg += " - Did you mean any of these?\n" + choices.collect { " $it" }.join('\n') + '\n'
return [success: false, error: msg]
}

try (Connection conn = groovy.sql.Sql.newInstance(dataSource.toMap()).getConnection()) {
try (Statement stm = conn.createStatement()) {
String normalizedStatement = normalizeStatement(statement)

boolean isDDL = normalizedStatement.trim().toLowerCase().matches("^(create|alter|drop|truncate).*")

if (isDDL) {
stm.execute(normalizedStatement)
return [success: true, result: null]
} else {
Integer rowsAffected = stm.executeUpdate(normalizedStatement)
return [success: true, result: rowsAffected]
}
}
}
catch (Exception e) {
log.error("Error executing SQL statement: ${e.message}", e)
return [success: false, error: e.message]
}
}

/**
* Normalizes a SQL statement by adding a semicolon if needed
*
* @param statement The SQL statement to normalize
* @return The normalized SQL statement
*/
private static String normalizeStatement(String statement) {
if (!statement)
return null
def result = statement.trim()
if (!result.endsWith(';'))
result += ';'
return result
}
}
Loading