From e1de704201a761271f79981c769fff37a34e8bfe Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 20 Sep 2023 19:42:06 +0000 Subject: [PATCH] fix(lambda): Lambda is leaking threads on agent refreshes. remove the custom threadpool (#6048) (#6049) (cherry picked from commit 32b0df9b784811009377571116855b549889718a) Co-authored-by: Jason --- .../aws/deploy/asg/AsgConfigHelperSpec.groovy | 3 +- .../provider/agent/LambdaCachingAgent.java | 8 +-- .../lambda/service/LambdaService.java | 52 ++++--------------- .../service/config/LambdaServiceConfig.java | 6 --- .../agent/LambdaCachingAgentTest.java | 1 - .../lambda/service/LambdaServiceTest.java | 32 ++---------- 6 files changed, 18 insertions(+), 84 deletions(-) diff --git a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/deploy/asg/AsgConfigHelperSpec.groovy b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/deploy/asg/AsgConfigHelperSpec.groovy index e75c5f21acd..7bb927a1d75 100644 --- a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/deploy/asg/AsgConfigHelperSpec.groovy +++ b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/deploy/asg/AsgConfigHelperSpec.groovy @@ -53,8 +53,9 @@ class AsgConfigHelperSpec extends Specification { when: def actualName = AsgConfigHelper.createName(baseName, suffix) + //ignore the end precision for tests. then: - actualName.contains(expectedName) + actualName.contains(expectedName.substring(0, expectedName.length() - 3)) where: baseName | suffix || expectedName diff --git a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgent.java b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgent.java index 23206a8aa84..65145e3cefa 100644 --- a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgent.java +++ b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgent.java @@ -91,13 +91,7 @@ public class LambdaCachingAgent implements CachingAgent, AccountAware, OnDemandA this, AmazonCloudProvider.ID + ":" + AmazonCloudProvider.ID + ":" + OnDemandType.Function); this.lambdaService = - new LambdaService( - amazonClientProvider, - account, - region, - objectMapper, - lambdaServiceConfig, - serviceLimitConfiguration); + new LambdaService(amazonClientProvider, account, region, objectMapper, lambdaServiceConfig); } @Override diff --git a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaService.java b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaService.java index e7a50be76fe..4ef3f2a2cf2 100644 --- a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaService.java +++ b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaService.java @@ -21,11 +21,9 @@ import com.amazonaws.services.lambda.AWSLambda; import com.amazonaws.services.lambda.model.*; import com.fasterxml.jackson.databind.ObjectMapper; -import com.netflix.spinnaker.clouddriver.aws.AmazonCloudProvider; import com.netflix.spinnaker.clouddriver.aws.data.ArnUtils; import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider; import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials; -import com.netflix.spinnaker.clouddriver.core.limits.ServiceLimitConfiguration; import com.netflix.spinnaker.clouddriver.lambda.service.config.LambdaServiceConfig; import com.netflix.spinnaker.kork.exceptions.SpinnakerException; import groovy.util.logging.Slf4j; @@ -47,47 +45,37 @@ public class LambdaService { private final int RETRIES; private final Clock clock = Clock.systemDefaultZone(); private final ObjectMapper mapper; - private final ExecutorService executorService; public LambdaService( AmazonClientProvider amazonClientProvider, NetflixAmazonCredentials account, String region, ObjectMapper mapper, - LambdaServiceConfig lambdaServiceConfig, - ServiceLimitConfiguration serviceLimitConfiguration) { + LambdaServiceConfig lambdaServiceConfig) { + this.amazonClientProvider = amazonClientProvider; this.account = account; this.region = region; this.mapper = mapper; this.TIMEOUT_MINUTES = lambdaServiceConfig.getRetry().getTimeout(); this.RETRIES = lambdaServiceConfig.getRetry().getRetries(); - this.executorService = - Executors.newFixedThreadPool( - computeThreads(serviceLimitConfiguration, lambdaServiceConfig)); } - public List> getAllFunctions() throws InterruptedException { + public List> getAllFunctions() { List functions = listAllFunctionConfigurations(); - List> functionTasks = Collections.synchronizedList(new ArrayList<>()); List> hydratedFunctionList = Collections.synchronizedList(new ArrayList<>()); functions.stream() .forEach( f -> { Map functionAttributes = new ConcurrentHashMap<>(); - functionTasks.add(() -> addBaseAttributes(functionAttributes, f.getFunctionName())); - functionTasks.add( - () -> addRevisionsAttributes(functionAttributes, f.getFunctionName())); - functionTasks.add( - () -> - addAliasAndEventSourceMappingConfigurationAttributes( - functionAttributes, f.getFunctionName())); - functionTasks.add( - () -> addTargetGroupAttributes(functionAttributes, f.getFunctionName())); + addBaseAttributes(functionAttributes, f.getFunctionName()); + addRevisionsAttributes(functionAttributes, f.getFunctionName()); + addAliasAndEventSourceMappingConfigurationAttributes( + functionAttributes, f.getFunctionName()); + addTargetGroupAttributes(functionAttributes, f.getFunctionName()); hydratedFunctionList.add(functionAttributes); }); - executorService.invokeAll(functionTasks); // if addBaseAttributes returned null, the name won't be included. There is a chance other // resources still have @@ -105,12 +93,9 @@ public Map getFunctionByName(String functionName) throws Interru // return quick so we don't make extra api calls for a delete lambda return null; } - functionTasks.add(() -> addRevisionsAttributes(functionAttributes, functionName)); - functionTasks.add( - () -> - addAliasAndEventSourceMappingConfigurationAttributes(functionAttributes, functionName)); - functionTasks.add(() -> addTargetGroupAttributes(functionAttributes, functionName)); - executorService.invokeAll(functionTasks); + addRevisionsAttributes(functionAttributes, functionName); + addAliasAndEventSourceMappingConfigurationAttributes(functionAttributes, functionName); + addTargetGroupAttributes(functionAttributes, functionName); return functionAttributes; } @@ -361,19 +346,4 @@ private T retry(String requestName, Supplier fn, int maxRetries, int time } } } - - private int computeThreads( - ServiceLimitConfiguration serviceLimitConfiguration, - LambdaServiceConfig lambdaServiceConfig) { - int serviceLimit = - serviceLimitConfiguration - .getLimit( - ServiceLimitConfiguration.API_RATE_LIMIT, - AWSLambda.class.getSimpleName(), - account.getName(), - AmazonCloudProvider.ID, - 5.0d) - .intValue(); - return Math.min(serviceLimit * 2, lambdaServiceConfig.getConcurrency().getThreads()); - } } diff --git a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/config/LambdaServiceConfig.java b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/config/LambdaServiceConfig.java index 4cd3277e416..21eb13a1ee2 100644 --- a/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/config/LambdaServiceConfig.java +++ b/clouddriver-lambda/src/main/java/com/netflix/spinnaker/clouddriver/lambda/service/config/LambdaServiceConfig.java @@ -26,16 +26,10 @@ public class LambdaServiceConfig { private Retry retry = new Retry(); - private Concurrency concurrency = new Concurrency(); @Data public static class Retry { private int timeout = 15; private int retries = 5; } - - @Data - public static class Concurrency { - private int threads = 10; - } } diff --git a/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgentTest.java b/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgentTest.java index 59802e40206..168c8fe57d5 100644 --- a/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgentTest.java +++ b/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/provider/agent/LambdaCachingAgentTest.java @@ -49,7 +49,6 @@ public class LambdaCachingAgentTest { @Before public void setup() { when(config.getRetry()).thenReturn(new LambdaServiceConfig.Retry()); - when(config.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency()); when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0); lambdaCachingAgent = new LambdaCachingAgent( diff --git a/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaServiceTest.java b/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaServiceTest.java index cc62caf1e09..1010f1138af 100644 --- a/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaServiceTest.java +++ b/clouddriver-lambda/src/test/java/com/netflix/spinnaker/clouddriver/lambda/service/LambdaServiceTest.java @@ -32,19 +32,13 @@ class LambdaServiceTest { @Test void getAllFunctionsWhenFunctionsResultIsNullExpectEmpty() throws InterruptedException { when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry()); - when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency()); when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0); AWSLambda lambda = mock(AWSLambda.class); // returns null by default when(clientProvider.getAmazonLambda(any(), any())).thenReturn(lambda); LambdaService lambdaService = new LambdaService( - clientProvider, - netflixAmazonCredentials, - REGION, - objectMapper, - lambdaServiceConfig, - serviceLimitConfiguration); + clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig); List> allFunctions = lambdaService.getAllFunctions(); @@ -54,7 +48,6 @@ void getAllFunctionsWhenFunctionsResultIsNullExpectEmpty() throws InterruptedExc @Test void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedException { when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry()); - when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency()); when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0); ListFunctionsResult functionsResult = mock(ListFunctionsResult.class); @@ -66,12 +59,7 @@ void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedEx LambdaService lambdaService = new LambdaService( - clientProvider, - netflixAmazonCredentials, - REGION, - objectMapper, - lambdaServiceConfig, - serviceLimitConfiguration); + clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig); List> allFunctions = lambdaService.getAllFunctions(); @@ -81,7 +69,6 @@ void getAllFunctionsWhenFunctionsResultIsEmptyExpectEmpty() throws InterruptedEx @Test void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedException { when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry()); - when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency()); when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0); ListFunctionsResult functionsResult = mock(ListFunctionsResult.class); @@ -93,12 +80,7 @@ void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedExcep LambdaService lambdaService = new LambdaService( - clientProvider, - netflixAmazonCredentials, - REGION, - objectMapper, - lambdaServiceConfig, - serviceLimitConfiguration); + clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig); List> allFunctions = lambdaService.getAllFunctions(); @@ -108,7 +90,6 @@ void getAllFunctionsWhenFunctionNameIsEmptyExpectEmpty() throws InterruptedExcep @Test void getAllFunctionsWhenFunctionNameIsNotEmptyExpectNotEmpty() throws InterruptedException { when(lambdaServiceConfig.getRetry()).thenReturn(new LambdaServiceConfig.Retry()); - when(lambdaServiceConfig.getConcurrency()).thenReturn(new LambdaServiceConfig.Concurrency()); when(serviceLimitConfiguration.getLimit(any(), any(), any(), any(), any())).thenReturn(1.0); ListFunctionsResult functionsResult = mock(ListFunctionsResult.class); @@ -160,12 +141,7 @@ void getAllFunctionsWhenFunctionNameIsNotEmptyExpectNotEmpty() throws Interrupte LambdaService lambdaService = new LambdaService( - clientProvider, - netflixAmazonCredentials, - REGION, - objectMapper, - lambdaServiceConfig, - serviceLimitConfiguration); + clientProvider, netflixAmazonCredentials, REGION, objectMapper, lambdaServiceConfig); List> allFunctions = lambdaService.getAllFunctions();