Skip to content

Commit

Permalink
fix(lambda): Lambda is leaking threads on agent refreshes. remove the…
Browse files Browse the repository at this point in the history
… custom threadpool (#6048) (#6049)

(cherry picked from commit 32b0df9)

Co-authored-by: Jason <[email protected]>
  • Loading branch information
mergify[bot] and jasonmcintosh authored Sep 20, 2023
1 parent 3cde031 commit e1de704
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ class AsgConfigHelperSpec extends Specification {
when:
def actualName = AsgConfigHelper.createName(baseName, suffix)

//ignore the end precision for tests.
then:
actualName.contains(expectedName)
actualName.contains(expectedName.substring(0, expectedName.length() - 3))

where:
baseName | suffix || expectedName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,7 @@ public class LambdaCachingAgent implements CachingAgent, AccountAware, OnDemandA
this,
AmazonCloudProvider.ID + ":" + AmazonCloudProvider.ID + ":" + OnDemandType.Function);
this.lambdaService =
new LambdaService(
amazonClientProvider,
account,
region,
objectMapper,
lambdaServiceConfig,
serviceLimitConfiguration);
new LambdaService(amazonClientProvider, account, region, objectMapper, lambdaServiceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import com.amazonaws.services.lambda.AWSLambda;
import com.amazonaws.services.lambda.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.clouddriver.aws.AmazonCloudProvider;
import com.netflix.spinnaker.clouddriver.aws.data.ArnUtils;
import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider;
import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials;
import com.netflix.spinnaker.clouddriver.core.limits.ServiceLimitConfiguration;
import com.netflix.spinnaker.clouddriver.lambda.service.config.LambdaServiceConfig;
import com.netflix.spinnaker.kork.exceptions.SpinnakerException;
import groovy.util.logging.Slf4j;
Expand All @@ -47,47 +45,37 @@ public class LambdaService {
private final int RETRIES;
private final Clock clock = Clock.systemDefaultZone();
private final ObjectMapper mapper;
private final ExecutorService executorService;

public LambdaService(
AmazonClientProvider amazonClientProvider,
NetflixAmazonCredentials account,
String region,
ObjectMapper mapper,
LambdaServiceConfig lambdaServiceConfig,
ServiceLimitConfiguration serviceLimitConfiguration) {
LambdaServiceConfig lambdaServiceConfig) {

this.amazonClientProvider = amazonClientProvider;
this.account = account;
this.region = region;
this.mapper = mapper;
this.TIMEOUT_MINUTES = lambdaServiceConfig.getRetry().getTimeout();
this.RETRIES = lambdaServiceConfig.getRetry().getRetries();
this.executorService =
Executors.newFixedThreadPool(
computeThreads(serviceLimitConfiguration, lambdaServiceConfig));
}

public List<Map<String, Object>> getAllFunctions() throws InterruptedException {
public List<Map<String, Object>> getAllFunctions() {
List<FunctionConfiguration> functions = listAllFunctionConfigurations();
List<Callable<Void>> functionTasks = Collections.synchronizedList(new ArrayList<>());
List<Map<String, Object>> hydratedFunctionList =
Collections.synchronizedList(new ArrayList<>());
functions.stream()
.forEach(
f -> {
Map<String, Object> functionAttributes = new ConcurrentHashMap<>();
functionTasks.add(() -> addBaseAttributes(functionAttributes, f.getFunctionName()));
functionTasks.add(
() -> addRevisionsAttributes(functionAttributes, f.getFunctionName()));
functionTasks.add(
() ->
addAliasAndEventSourceMappingConfigurationAttributes(
functionAttributes, f.getFunctionName()));
functionTasks.add(
() -> addTargetGroupAttributes(functionAttributes, f.getFunctionName()));
addBaseAttributes(functionAttributes, f.getFunctionName());
addRevisionsAttributes(functionAttributes, f.getFunctionName());
addAliasAndEventSourceMappingConfigurationAttributes(
functionAttributes, f.getFunctionName());
addTargetGroupAttributes(functionAttributes, f.getFunctionName());
hydratedFunctionList.add(functionAttributes);
});
executorService.invokeAll(functionTasks);

// if addBaseAttributes returned null, the name won't be included. There is a chance other
// resources still have
Expand All @@ -105,12 +93,9 @@ public Map<String, Object> getFunctionByName(String functionName) throws Interru
// return quick so we don't make extra api calls for a delete lambda
return null;
}
functionTasks.add(() -> addRevisionsAttributes(functionAttributes, functionName));
functionTasks.add(
() ->
addAliasAndEventSourceMappingConfigurationAttributes(functionAttributes, functionName));
functionTasks.add(() -> addTargetGroupAttributes(functionAttributes, functionName));
executorService.invokeAll(functionTasks);
addRevisionsAttributes(functionAttributes, functionName);
addAliasAndEventSourceMappingConfigurationAttributes(functionAttributes, functionName);
addTargetGroupAttributes(functionAttributes, functionName);
return functionAttributes;
}

Expand Down Expand Up @@ -361,19 +346,4 @@ private <T> T retry(String requestName, Supplier<T> fn, int maxRetries, int time
}
}
}

private int computeThreads(
ServiceLimitConfiguration serviceLimitConfiguration,
LambdaServiceConfig lambdaServiceConfig) {
int serviceLimit =
serviceLimitConfiguration
.getLimit(
ServiceLimitConfiguration.API_RATE_LIMIT,
AWSLambda.class.getSimpleName(),
account.getName(),
AmazonCloudProvider.ID,
5.0d)
.intValue();
return Math.min(serviceLimit * 2, lambdaServiceConfig.getConcurrency().getThreads());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,10 @@
public class LambdaServiceConfig {

private Retry retry = new Retry();
private Concurrency concurrency = new Concurrency();

@Data
public static class Retry {
private int timeout = 15;
private int retries = 5;
}

@Data
public static class Concurrency {
private int threads = 10;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class LambdaCachingAgentTest {
@Before
public void setup() {
when(config.getRetry()).thenReturn(new LambdaServiceConfig.Retry());
when(config.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency());
when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0);
lambdaCachingAgent =
new LambdaCachingAgent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,13 @@ class LambdaServiceTest {
@Test
void getAllFunctionsWhenFunctionsResultIsNullExpectEmpty() throws InterruptedException {
when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry());
when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency());
when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0);
AWSLambda lambda = mock(AWSLambda.class); // returns null by default
when(clientProvider.getAmazonLambda(any(), any())).thenReturn(lambda);

LambdaService lambdaService =
new LambdaService(
clientProvider,
netflixAmazonCredentials,
REGION,
objectMapper,
lambdaServiceConfig,
serviceLimitConfiguration);
clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig);

List<Map<String, Object>> allFunctions = lambdaService.getAllFunctions();

Expand All @@ -54,7 +48,6 @@ void getAllFunctionsWhenFunctionsResultIsNullExpectEmpty() throws InterruptedExc
@Test
void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedException {
when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry());
when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency());
when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0);

ListFunctionsResult functionsResult = mock(ListFunctionsResult.class);
Expand All @@ -66,12 +59,7 @@ void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedEx

LambdaService lambdaService =
new LambdaService(
clientProvider,
netflixAmazonCredentials,
REGION,
objectMapper,
lambdaServiceConfig,
serviceLimitConfiguration);
clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig);

List<Map<String, Object>> allFunctions = lambdaService.getAllFunctions();

Expand All @@ -81,7 +69,6 @@ void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedEx
@Test
void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedException {
when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry());
when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency());
when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0);

ListFunctionsResult functionsResult = mock(ListFunctionsResult.class);
Expand All @@ -93,12 +80,7 @@ void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedExcep

LambdaService lambdaService =
new LambdaService(
clientProvider,
netflixAmazonCredentials,
REGION,
objectMapper,
lambdaServiceConfig,
serviceLimitConfiguration);
clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig);

List<Map<String, Object>> allFunctions = lambdaService.getAllFunctions();

Expand All @@ -108,7 +90,6 @@ void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedExcep
@Test
void getAllFunctionsWhenFunctionNameIsNotEmptyExpectNotEmpty() throws InterruptedException {
when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry());
when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency());
when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0);

ListFunctionsResult functionsResult = mock(ListFunctionsResult.class);
Expand Down Expand Up @@ -160,12 +141,7 @@ void getAllFunctionsWhenFunctionNameIsNotEmptyExpectNotEmpty() throws Interrupte

LambdaService lambdaService =
new LambdaService(
clientProvider,
netflixAmazonCredentials,
REGION,
objectMapper,
lambdaServiceConfig,
serviceLimitConfiguration);
clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig);

List<Map<String, Object>> allFunctions = lambdaService.getAllFunctions();

Expand Down

0 comments on commit e1de704

Please sign in to comment.