-
Notifications
You must be signed in to change notification settings - Fork 648
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(nf-tower): Add
TowerFusionEnv
provider to set required env vars
Signed-off-by: Alberto Miranda <[email protected]>
- Loading branch information
1 parent
40670f7
commit e7b1082
Showing
8 changed files
with
748 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
266 changes: 266 additions & 0 deletions
266
plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,266 @@ | ||
package io.seqera.tower.plugin | ||
|
||
import com.google.gson.Gson | ||
import dev.failsafe.Failsafe | ||
import dev.failsafe.RetryPolicy | ||
import dev.failsafe.event.EventListener | ||
import dev.failsafe.event.ExecutionAttemptedEvent | ||
import dev.failsafe.function.CheckedSupplier | ||
import groovy.transform.CompileStatic | ||
import groovy.util.logging.Slf4j | ||
import io.seqera.tower.plugin.exception.BadResponseException | ||
import io.seqera.tower.plugin.exception.UnauthorizedException | ||
import io.seqera.tower.plugin.exchange.LicenseTokenRequest | ||
import io.seqera.tower.plugin.exchange.LicenseTokenResponse | ||
import nextflow.Global | ||
import nextflow.Session | ||
import nextflow.SysEnv | ||
import nextflow.exception.AbortOperationException | ||
import nextflow.fusion.FusionConfig | ||
import nextflow.fusion.FusionEnv | ||
import nextflow.util.Threads | ||
import org.pf4j.Extension | ||
|
||
import java.net.http.HttpClient | ||
import java.net.http.HttpRequest | ||
import java.net.http.HttpResponse | ||
import java.time.Duration | ||
import java.time.temporal.ChronoUnit | ||
import java.util.concurrent.Executors | ||
import java.util.function.Predicate | ||
|
||
/** | ||
* Environment provider for Platform-specific environment variables. | ||
* | ||
* @author Alberto Miranda <[email protected]> | ||
*/ | ||
@Slf4j | ||
@Extension | ||
@CompileStatic | ||
class TowerFusionEnv implements FusionEnv { | ||
|
||
// The endpoint where license-scoped JWT tokens are obtained | ||
private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/' | ||
|
||
// Server errors that should trigger a retry | ||
private static final List<Integer> SERVER_ERRORS = [429, 500, 502, 503, 504] | ||
|
||
// Default connection timeout for HTTP requests | ||
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS) | ||
|
||
// Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter | ||
private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS) | ||
private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS) | ||
private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10 | ||
private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5 | ||
|
||
// The HttpClient instance used to send requests | ||
private final HttpClient httpClient = newDefaultHttpClient() | ||
|
||
// The RetryPolicy instance used to retry requests | ||
private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS) | ||
|
||
// Nextflow session | ||
private final Session session | ||
|
||
// Platform endpoint to use for requests | ||
private final String endpoint | ||
|
||
// Platform access token to use for requests | ||
private final String accessToken | ||
|
||
/** | ||
* Constructor for the class. It initializes the session, endpoint, and access token. | ||
*/ | ||
TowerFusionEnv() { | ||
this.session = Global.session as Session | ||
final towerConfig = session.config.navigate('tower') as Map ?: [:] | ||
final env = SysEnv.get() | ||
this.endpoint = endpoint0(towerConfig, env) | ||
this.accessToken = accessToken0(towerConfig, env) | ||
} | ||
|
||
/** | ||
* Return any environment variables relevant to Fusion execution. This method is called | ||
* by {@link nextflow.fusion.FusionEnvProvider#getEnvironment} to determine which | ||
* environment variables are needed for the current run. | ||
* | ||
* @param scheme The scheme for which the environment variables are needed (currently unused) | ||
* @param config The Fusion configuration object | ||
* @return A map of environment variables | ||
*/ | ||
@Override | ||
Map<String, String> getEnvironment(String scheme, FusionConfig config) { | ||
|
||
// TODO(amiranda): Hardcoded for now. We need to find out how to obtain | ||
// the concrete product SKU and version. Candidate: FusionConfig? | ||
final product = 'fusion' | ||
final version = '2.4' | ||
|
||
try { | ||
final token = getLicenseToken(product, version) | ||
return [ | ||
'FUSION_LICENSE_TOKEN': token, | ||
] | ||
} catch (Exception e) { | ||
log.warn("Error retrieving Fusion license information: ${e.message}") | ||
return [:] | ||
} | ||
} | ||
|
||
/** | ||
* Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the | ||
* Platform access token provided in the configuration of the current session. | ||
* | ||
* @throws AbortOperationException if a Platform access token cannot be found | ||
* | ||
* @return The signed JWT token | ||
*/ | ||
protected String getLicenseToken(product, version) { | ||
// FIXME(amiranda): Find out how to obtain the product and version | ||
// Candidate: FusionConfig? | ||
|
||
if (accessToken == null) { | ||
throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") | ||
} | ||
|
||
final req = HttpRequest.newBuilder() | ||
.uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize()) | ||
.header('Content-Type', 'application/json') | ||
.header('Authorization', "Bearer ${accessToken}") | ||
.POST( | ||
HttpRequest.BodyPublishers.ofString( | ||
new Gson().toJson( | ||
new LicenseTokenRequest( | ||
product: product, | ||
version: version | ||
), | ||
LicenseTokenRequest.class | ||
), | ||
) | ||
) | ||
.build() | ||
|
||
try { | ||
final resp = safeHttpSend(req, retryPolicy) | ||
|
||
if (resp.statusCode() == 200) { | ||
return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken | ||
} | ||
|
||
if (resp.statusCode() == 401) { | ||
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token") | ||
} | ||
|
||
throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}") | ||
|
||
} catch (IOException e) { | ||
throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}") | ||
} | ||
} | ||
|
||
/************************************************************************** | ||
* Helper methods | ||
*************************************************************************/ | ||
|
||
/** | ||
* Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the | ||
* environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint. | ||
* | ||
* @param opts the configuration options for Platform | ||
* @param env the applicable environment variables | ||
* @return the Platform API endpoint | ||
*/ | ||
protected static String endpoint0(Map opts, Map<String, String> env) { | ||
def result = opts.endpoint as String | ||
if (!result || result == '-') { | ||
result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL | ||
} | ||
return result.stripEnd('/') | ||
} | ||
|
||
/** | ||
* Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running | ||
* in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check | ||
* the configuration file or fallback to the environment. If no token is found, returns null. | ||
* | ||
* @param opts the configuration options for Platform | ||
* @param env the applicable environment variables | ||
* @return the Platform access token | ||
*/ | ||
protected static String accessToken0(Map opts, Map<String, String> env) { | ||
def token = env.get('TOWER_WORKFLOW_ID') | ||
? env.get('TOWER_ACCESS_TOKEN') | ||
: opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') | ||
return token | ||
} | ||
|
||
/** | ||
* Create a new HttpClient instance with default settings | ||
* @return The new HttpClient instance | ||
*/ | ||
private static HttpClient newDefaultHttpClient() { | ||
final builder = HttpClient.newBuilder() | ||
.version(HttpClient.Version.HTTP_1_1) | ||
.followRedirects(HttpClient.Redirect.NEVER) | ||
.cookieHandler(new CookieManager()) | ||
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT) | ||
// use virtual threads executor if enabled | ||
if ( Threads.useVirtual() ) { | ||
builder.executor(Executors.newVirtualThreadPerTaskExecutor()) | ||
} | ||
// build and return the new client | ||
return builder.build() | ||
} | ||
|
||
/** | ||
* Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy, | ||
* a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay, | ||
* max delay, and jitter are controlled by the corresponding values defined at class level. | ||
* | ||
* @return The new RetryPolicy instance | ||
*/ | ||
private static <T> RetryPolicy<HttpResponse<T>> newDefaultRetryPolicy(List<Integer> errorsToRetry) { | ||
|
||
final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable> | ||
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in errorsToRetry) as Predicate<HttpResponse<T>> | ||
|
||
final listener = new EventListener<ExecutionAttemptedEvent<HttpResponse<T>>>() { | ||
@Override | ||
void accept(ExecutionAttemptedEvent event) throws Throwable { | ||
def msg = "connection failure - attempt: ${event.attemptCount}" | ||
if (event.lastResult != null) | ||
msg += "; response: ${event.lastResult}" | ||
if (event.lastFailure != null) | ||
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}" | ||
log.debug(msg) | ||
} | ||
} | ||
return RetryPolicy.<HttpResponse<T>> builder() | ||
.handleIf(retryOnException) | ||
.handleResultIf(retryOnStatusCode) | ||
.withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS) | ||
.withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS) | ||
.withJitter(DEFAULT_RETRY_POLICY_JITTER) | ||
.onRetry(listener) | ||
.build() | ||
} | ||
|
||
/** | ||
* Send an HTTP request and return the response. This method automatically retries the request according to the | ||
* given RetryPolicy. | ||
* | ||
* @param req The HttpRequest to send | ||
* @return The HttpResponse received | ||
*/ | ||
private <T> HttpResponse<String> safeHttpSend(HttpRequest req, RetryPolicy<T> policy) { | ||
return Failsafe.with(policy).get( | ||
() -> { | ||
log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}" | ||
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString()) | ||
log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}" | ||
return resp | ||
} as CheckedSupplier | ||
) as HttpResponse<String> | ||
} | ||
} |
7 changes: 7 additions & 0 deletions
7
plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/BadResponseException.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.seqera.tower.plugin.exception | ||
|
||
import groovy.transform.InheritConstructors | ||
|
||
@InheritConstructors | ||
class BadResponseException extends RuntimeException{ | ||
} |
7 changes: 7 additions & 0 deletions
7
plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/UnauthorizedException.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.seqera.tower.plugin.exception | ||
|
||
import groovy.transform.InheritConstructors | ||
|
||
@InheritConstructors | ||
class UnauthorizedException extends RuntimeException { | ||
} |
22 changes: 22 additions & 0 deletions
22
plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenRequest.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.seqera.tower.plugin.exchange | ||
|
||
import groovy.transform.CompileStatic | ||
import groovy.transform.EqualsAndHashCode | ||
import groovy.transform.ToString | ||
|
||
/** | ||
* Models a REST request to obtain a license-scoped JWT token from Platform | ||
* | ||
* @author Alberto Miranda <[email protected]> | ||
*/ | ||
@EqualsAndHashCode | ||
@ToString(includeNames = true, includePackage = false) | ||
@CompileStatic | ||
class LicenseTokenRequest { | ||
|
||
/** The product code */ | ||
String product | ||
|
||
/** The product version */ | ||
String version | ||
} |
23 changes: 23 additions & 0 deletions
23
plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenResponse.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.seqera.tower.plugin.exchange | ||
|
||
import groovy.transform.CompileStatic | ||
import groovy.transform.ToString | ||
|
||
/** | ||
* Models a REST response containing a license-scoped JWT token from Platform | ||
* | ||
* @author Alberto Miranda <[email protected]> | ||
*/ | ||
@CompileStatic | ||
@ToString(includeNames = true, includePackage = false) | ||
class LicenseTokenResponse { | ||
/** | ||
* The signed JWT token | ||
*/ | ||
String signedToken | ||
|
||
/** | ||
* The expiration date of the token | ||
*/ | ||
Date expirationDate | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,3 +10,4 @@ | |
# | ||
|
||
io.seqera.tower.plugin.TowerFactory | ||
io.seqera.tower.plugin.TowerFusionEnv |
Oops, something went wrong.