Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce conflicts when update configmap in k8s #89 #93

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Apollo Java 2.4.0
* [Fix monitor arg cause npe](https://github.com/apolloconfig/apollo-java/pull/86)
* [Fix the concurrent issue in SpringValueRegistry.scanAndClean](https://github.com/apolloconfig/apollo-java/pull/95)
* [Feature support incremental configuration synchronization client](https://github.com/apolloconfig/apollo-java/pull/90)
* [Feature reduce conflicts when update configmap in k8](https://github.com/apolloconfig/apollo-java/pull/93)

------------------
All issues and pull requests are [here](https://github.com/apolloconfig/apollo-java/milestone/4?closed=1)
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,67 @@
*/
package com.ctrip.framework.apollo.kubernetes;

import com.ctrip.framework.apollo.build.ApolloInjector;
import com.ctrip.framework.apollo.core.utils.StringUtils;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.util.ConfigUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Manages Kubernetes ConfigMap operations.
* Required Kubernetes permissions:
* - pods: [get, list] - For pod selection and write eligibility
* - configmaps: [get, create, update] - For ConfigMap operations
*/
@Service
public class KubernetesManager {
private static final Logger logger = LoggerFactory.getLogger(KubernetesManager.class);

private static final String RUNNING_POD_FIELD_SELECTOR = "status.phase=Running";

private static final int MAX_SEARCH_NUM = 1000;

private ApiClient client;
private CoreV1Api coreV1Api;
private int propertyKubernetesMaxWritePods;
private String localPodName = System.getenv("HOSTNAME");

public KubernetesManager() {
try {
client = Config.defaultClient();
coreV1Api = new CoreV1Api(client);
ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class);
propertyKubernetesMaxWritePods = configUtil.getPropertyKubernetesMaxWritePods();
} catch (Exception e) {
String errorMessage = "Failed to initialize Kubernetes client: " + e.getMessage();
logger.error(errorMessage, e);
throw new RuntimeException(errorMessage, e);
}
}

public KubernetesManager(CoreV1Api coreV1Api) {
@VisibleForTesting
public KubernetesManager(CoreV1Api coreV1Api, String localPodName, int propertyKubernetesMaxWritePods) {
this.coreV1Api = coreV1Api;
this.localPodName = localPodName;
this.propertyKubernetesMaxWritePods = propertyKubernetesMaxWritePods;
}

private V1ConfigMap buildConfigMap(String name, String namespace, Map<String, String> data) {
Expand Down Expand Up @@ -132,6 +156,10 @@ public boolean updateConfigMap(String k8sNamespace, String name, Map<String, Str
return false;
}

if (!isWritePod(k8sNamespace)) {
return true;
}
Comment on lines +159 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reconsider return value for non-write pods

Returning true when isWritePod returns false might mask issues by indicating success when no update was attempted. Consider returning a different status or throwing an exception to make it clear that the pod is not eligible to write.


int maxRetries = 5;
int retryCount = 0;
long waitTime = 100;
Expand Down Expand Up @@ -205,4 +233,43 @@ public boolean checkConfigMapExist(String k8sNamespace, String configMapName) {
return false;
}
}

/**
* check pod whether pod can write configmap
*
* @param k8sNamespace config map namespace
* @return true if this pod can write configmap, false otherwise
*/
private boolean isWritePod(String k8sNamespace) {
try {
if (Strings.isNullOrEmpty(localPodName)) {
return true;
}
24kpure marked this conversation as resolved.
Show resolved Hide resolved
V1Pod localPod = coreV1Api.readNamespacedPod(localPodName, k8sNamespace, null);
24kpure marked this conversation as resolved.
Show resolved Hide resolved
V1ObjectMeta localMetadata = localPod.getMetadata();
if (localMetadata == null || localMetadata.getLabels() == null) {
return true;
}
String appName = localMetadata.getLabels().get("app");
String labelSelector = "app=" + appName;

V1PodList v1PodList = coreV1Api.listNamespacedPod(k8sNamespace, null, null,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the number of pod instances is large, the interface return value may be particularly large, and the limit parameter needs to be used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what is a large number of instances, so I set it to 1000 . How about your advice?

null, RUNNING_POD_FIELD_SELECTOR, labelSelector,
MAX_SEARCH_NUM, null, null
, null, null);

return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
24kpure marked this conversation as resolved.
Show resolved Hide resolved
//Make each node selects the same write nodes by sorting
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals);
Comment on lines +261 to +269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance pod selection criteria

The current pod selection logic only considers the creation timestamp and "Running" status. Consider additional criteria:

  1. Pod readiness status (Ready condition)
  2. Pod container statuses
  3. Pod deletion timestamp (ignore pods being terminated)

Example enhancement:

     return v1PodList.getItems().stream()
         .map(V1Pod::getMetadata)
         .filter(Objects::nonNull)
+        .filter(metadata -> metadata.getDeletionTimestamp() == null)
         .filter(metadata -> metadata.getCreationTimestamp() != null)
         .sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
         .map(V1ObjectMeta::getName)
         .limit(propertyKubernetesMaxWritePods)
         .anyMatch(localPodName::equals);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
//Make each node selects the same write nodes by sorting
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals);
return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
.filter(metadata -> metadata.getDeletionTimestamp() == null)
//Make each node selects the same write nodes by sorting
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals);

} catch (Exception e) {
logger.info("Error determining write pod eligibility:{}", e.getMessage(), e);
return true;
}
24kpure marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ public class ConfigUtil {
private boolean propertyFileCacheEnabled = true;
private boolean overrideSystemProperties = true;
private boolean propertyKubernetesCacheEnabled = false;
private int propertyKubernetesMaxWritePods = 3;
private boolean clientMonitorEnabled = false;
private boolean clientMonitorJmxEnabled = false;
private String monitorExternalType = "";
private long monitorExternalExportPeriod = 10;
private int monitorExceptionQueueSize = 25;


public ConfigUtil() {
warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
initRefreshInterval();
Expand All @@ -93,6 +95,7 @@ public ConfigUtil() {
initPropertyFileCacheEnabled();
initOverrideSystemProperties();
initPropertyKubernetesCacheEnabled();
initPropertyKubernetesMaxWritePods();
initClientMonitorEnabled();
initClientMonitorJmxEnabled();
initClientMonitorExternalType();
Expand Down Expand Up @@ -390,31 +393,44 @@ private String getDeprecatedCustomizedCacheRoot() {
}

public String getK8sNamespace() {
String k8sNamespace = getCacheKubernetesNamespace();
return getK8sConfigProperties(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE,
ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES,
ConfigConsts.KUBERNETES_CACHE_CONFIG_MAP_NAMESPACE_DEFAULT);
}

if (!Strings.isNullOrEmpty(k8sNamespace)) {
return k8sNamespace;
private void initPropertyKubernetesMaxWritePods() {
String propertyKubernetesMaxWritePodsStr = getK8sConfigProperties(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS,
ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS_ENVIRONMENT_VARIABLES,
String.valueOf(propertyKubernetesMaxWritePods));
if (!Strings.isNullOrEmpty(propertyKubernetesMaxWritePodsStr)) {
try {
propertyKubernetesMaxWritePods = Integer.parseInt(propertyKubernetesMaxWritePodsStr);
} catch (Throwable ex) {
logger.error("Config for {} is invalid: {}",
ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, propertyKubernetesMaxWritePodsStr);
}
24kpure marked this conversation as resolved.
Show resolved Hide resolved
}

return ConfigConsts.KUBERNETES_CACHE_CONFIG_MAP_NAMESPACE_DEFAULT;
}

private String getCacheKubernetesNamespace() {
private String getK8sConfigProperties(String key, String environmentKey, String defaultValue) {
// 1. Get from System Property
String k8sNamespace = System.getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE);
String k8sNamespace = System.getProperty(key);
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 2. Get from OS environment variable
k8sNamespace = System.getenv(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES);
k8sNamespace = System.getenv(environmentKey);
}
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 3. Get from server.properties
k8sNamespace = Foundation.server().getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, null);
k8sNamespace = Foundation.server().getProperty(key, null);
}
if (Strings.isNullOrEmpty(k8sNamespace)) {
// 4. Get from app.properties
k8sNamespace = Foundation.app().getProperty(ApolloClientSystemConsts.APOLLO_CACHE_KUBERNETES_NAMESPACE, null);
k8sNamespace = Foundation.app().getProperty(key, null);
}
if (!Strings.isNullOrEmpty(k8sNamespace)) {
return k8sNamespace;
}
return k8sNamespace;
return defaultValue;
}

public boolean isInLocalMode() {
Expand Down Expand Up @@ -524,6 +540,10 @@ public boolean isPropertyKubernetesCacheEnabled() {
return propertyKubernetesCacheEnabled;
}

public int getPropertyKubernetesMaxWritePods() {
return propertyKubernetesMaxWritePods;
}

public boolean isOverrideSystemProperties() {
return overrideSystemProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@
"description": "kubernetes configmap namespace.",
"defaultValue": "default"
},
{
"name": "apollo.cache.kubernetes.max-write-pods",
"type": "java.lang.String",
"sourceType": "com.ctrip.framework.apollo.util.ConfigUtil",
"description": "max number of pods that can write the configmap cache in Kubernetes.",
"defaultValue": "3"
},
{
"name": "apollo.property.order.enable",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,30 @@
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class KubernetesManagerTest {

Expand All @@ -38,7 +54,7 @@ public class KubernetesManagerTest {
@Before
public void setUp() {
coreV1Api = mock(CoreV1Api.class);
kubernetesManager = new KubernetesManager(coreV1Api);
kubernetesManager = new KubernetesManager(coreV1Api, "localPodName", 3);

MockInjector.setInstance(KubernetesManager.class, kubernetesManager);
MockInjector.setInstance(CoreV1Api.class, coreV1Api);
Expand All @@ -58,13 +74,13 @@ public void testCreateConfigMapSuccess() throws Exception {
.metadata(new V1ObjectMeta().name(name).namespace(namespace))
.data(data);

when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(),isNull())).thenReturn(configMap);
when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(), isNull())).thenReturn(configMap);

// act
String result = kubernetesManager.createConfigMap(namespace, name, data);

// assert
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull());
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull());
assert name.equals(result);
}

Expand All @@ -82,7 +98,7 @@ public void testCreateConfigMapNullData() throws Exception {
String result = kubernetesManager.createConfigMap(namespace, name, data);

// assert
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull());
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull());
assert name.equals(result);
}

Expand Down Expand Up @@ -135,20 +151,40 @@ public void testUpdateConfigMapSuccess() throws Exception {
// arrange
String namespace = "default";
String name = "testConfigMap";
Map<String, String> data = new HashMap<>();
data.put("key", "value");

V1Pod pod = new V1Pod()
.metadata(
new V1ObjectMeta()
.name("localPodName")
.creationTimestamp(OffsetDateTime.now())
.labels(Collections.singletonMap("app", "app")));
V1PodList v1PodList = new V1PodList().addItemsItem(new V1Pod().metadata(pod.getMetadata()));
Comment on lines +155 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage for pod scenarios

Add test cases for different pod scenarios:

  1. Pod without required labels
  2. Pod with deletion timestamp
  3. Multiple pods with different creation timestamps
  4. Pods in non-running states

Would you like me to generate these additional test cases?


Map<String, String> existData = new HashMap<>();
existData.put("key", "value");
V1ConfigMap configMap = new V1ConfigMap();
configMap.metadata(new V1ObjectMeta().name(name).namespace(namespace));
configMap.data(data);
configMap.data(existData);

when(coreV1Api.readNamespacedPod("localPodName", namespace, null)).thenReturn(pod);
when(coreV1Api.listNamespacedPod(namespace, null, null,
null, null, "app=app",
null, null, null
, null, null)).thenReturn(v1PodList);
when(coreV1Api.readNamespacedConfigMap(name, namespace, null)).thenReturn(configMap);
when(coreV1Api.replaceNamespacedConfigMap(name, namespace, configMap, null, null, null, null)).thenReturn(configMap);

// act
Boolean success = kubernetesManager.updateConfigMap(namespace, name, data);
HashMap<String, String> updateData = new HashMap<>(existData);
updateData.put("newKey","newValue");
boolean success = kubernetesManager.updateConfigMap(namespace, name, updateData);

// assert
assertTrue(success);
Mockito.verify(coreV1Api, Mockito.times(1)).listNamespacedPod(namespace, null, null,
null, null, "app=app",
null, null, null
, null, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public class ApolloClientSystemConsts {
*/
public static final String APOLLO_CACHE_KUBERNETES_NAMESPACE_ENVIRONMENT_VARIABLES = "APOLLO_CACHE_KUBERNETES_NAMESPACE";

/**
* max number of pods that can write the configmap cache in Kubernetes
*/
public static final String APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS = "apollo.cache.kubernetes.max-write-pods";

/**
* max number of pods that can write the configmap cache in Kubernetes environment variables
*/
public static final String APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS_ENVIRONMENT_VARIABLES = "APOLLO_CACHE_KUBERNETES_MAX_WRITE_PODS";

/**
* apollo client access key
*/
Expand Down
Loading