-
-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reduce conflicts when update configmap in k8s #89 #93
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,43 +16,67 @@ | |||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||
package com.ctrip.framework.apollo.kubernetes; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.build.ApolloInjector; | ||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.core.utils.StringUtils; | ||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.exceptions.ApolloConfigException; | ||||||||||||||||||||||||||||||||||||||||
import com.ctrip.framework.apollo.util.ConfigUtil; | ||||||||||||||||||||||||||||||||||||||||
import com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||||||||||||||||||||||||
import com.google.common.base.Strings; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.ApiClient; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.ApiException; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.apis.CoreV1Api; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ConfigMap; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1ObjectMeta; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1Pod; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.openapi.models.V1PodList; | ||||||||||||||||||||||||||||||||||||||||
import io.kubernetes.client.util.Config; | ||||||||||||||||||||||||||||||||||||||||
import org.slf4j.Logger; | ||||||||||||||||||||||||||||||||||||||||
import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||||||||||||
import org.springframework.stereotype.Service; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
import java.util.Comparator; | ||||||||||||||||||||||||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||||||||||||||||||||||||
import java.util.Map; | ||||||||||||||||||||||||||||||||||||||||
import java.util.Objects; | ||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.TimeUnit; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||
* Manages Kubernetes ConfigMap operations. | ||||||||||||||||||||||||||||||||||||||||
* Required Kubernetes permissions: | ||||||||||||||||||||||||||||||||||||||||
* - pods: [get, list] - For pod selection and write eligibility | ||||||||||||||||||||||||||||||||||||||||
* - configmaps: [get, create, update] - For ConfigMap operations | ||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||
@Service | ||||||||||||||||||||||||||||||||||||||||
public class KubernetesManager { | ||||||||||||||||||||||||||||||||||||||||
private static final Logger logger = LoggerFactory.getLogger(KubernetesManager.class); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static final String RUNNING_POD_FIELD_SELECTOR = "status.phase=Running"; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static final int MAX_SEARCH_NUM = 1000; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private ApiClient client; | ||||||||||||||||||||||||||||||||||||||||
private CoreV1Api coreV1Api; | ||||||||||||||||||||||||||||||||||||||||
private int propertyKubernetesMaxWritePods; | ||||||||||||||||||||||||||||||||||||||||
private String localPodName = System.getenv("HOSTNAME"); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public KubernetesManager() { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
client = Config.defaultClient(); | ||||||||||||||||||||||||||||||||||||||||
coreV1Api = new CoreV1Api(client); | ||||||||||||||||||||||||||||||||||||||||
ConfigUtil configUtil = ApolloInjector.getInstance(ConfigUtil.class); | ||||||||||||||||||||||||||||||||||||||||
propertyKubernetesMaxWritePods = configUtil.getPropertyKubernetesMaxWritePods(); | ||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||
String errorMessage = "Failed to initialize Kubernetes client: " + e.getMessage(); | ||||||||||||||||||||||||||||||||||||||||
logger.error(errorMessage, e); | ||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(errorMessage, e); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public KubernetesManager(CoreV1Api coreV1Api) { | ||||||||||||||||||||||||||||||||||||||||
@VisibleForTesting | ||||||||||||||||||||||||||||||||||||||||
public KubernetesManager(CoreV1Api coreV1Api, String localPodName, int propertyKubernetesMaxWritePods) { | ||||||||||||||||||||||||||||||||||||||||
this.coreV1Api = coreV1Api; | ||||||||||||||||||||||||||||||||||||||||
this.localPodName = localPodName; | ||||||||||||||||||||||||||||||||||||||||
this.propertyKubernetesMaxWritePods = propertyKubernetesMaxWritePods; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private V1ConfigMap buildConfigMap(String name, String namespace, Map<String, String> data) { | ||||||||||||||||||||||||||||||||||||||||
|
@@ -132,6 +156,10 @@ public boolean updateConfigMap(String k8sNamespace, String name, Map<String, Str | |||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (!isWritePod(k8sNamespace)) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
int maxRetries = 5; | ||||||||||||||||||||||||||||||||||||||||
int retryCount = 0; | ||||||||||||||||||||||||||||||||||||||||
long waitTime = 100; | ||||||||||||||||||||||||||||||||||||||||
|
@@ -205,4 +233,43 @@ public boolean checkConfigMapExist(String k8sNamespace, String configMapName) { | |||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||
* check pod whether pod can write configmap | ||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||
* @param k8sNamespace config map namespace | ||||||||||||||||||||||||||||||||||||||||
* @return true if this pod can write configmap, false otherwise | ||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||
private boolean isWritePod(String k8sNamespace) { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
if (Strings.isNullOrEmpty(localPodName)) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
V1Pod localPod = coreV1Api.readNamespacedPod(localPodName, k8sNamespace, null); | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
V1ObjectMeta localMetadata = localPod.getMetadata(); | ||||||||||||||||||||||||||||||||||||||||
if (localMetadata == null || localMetadata.getLabels() == null) { | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
String appName = localMetadata.getLabels().get("app"); | ||||||||||||||||||||||||||||||||||||||||
String labelSelector = "app=" + appName; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
V1PodList v1PodList = coreV1Api.listNamespacedPod(k8sNamespace, null, null, | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the number of pod instances is large, the interface return value may be particularly large, and the limit parameter needs to be used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what is a large number of instances, so I set it to 1000 . How about your advice? |
||||||||||||||||||||||||||||||||||||||||
null, RUNNING_POD_FIELD_SELECTOR, labelSelector, | ||||||||||||||||||||||||||||||||||||||||
MAX_SEARCH_NUM, null, null | ||||||||||||||||||||||||||||||||||||||||
, null, null); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return v1PodList.getItems().stream() | ||||||||||||||||||||||||||||||||||||||||
.map(V1Pod::getMetadata) | ||||||||||||||||||||||||||||||||||||||||
.filter(Objects::nonNull) | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
//Make each node selects the same write nodes by sorting | ||||||||||||||||||||||||||||||||||||||||
.filter(metadata -> metadata.getCreationTimestamp() != null) | ||||||||||||||||||||||||||||||||||||||||
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp)) | ||||||||||||||||||||||||||||||||||||||||
.map(V1ObjectMeta::getName) | ||||||||||||||||||||||||||||||||||||||||
.limit(propertyKubernetesMaxWritePods) | ||||||||||||||||||||||||||||||||||||||||
.anyMatch(localPodName::equals); | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+261
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance pod selection criteria The current pod selection logic only considers the creation timestamp and "Running" status. Consider additional criteria:
Example enhancement: return v1PodList.getItems().stream()
.map(V1Pod::getMetadata)
.filter(Objects::nonNull)
+ .filter(metadata -> metadata.getDeletionTimestamp() == null)
.filter(metadata -> metadata.getCreationTimestamp() != null)
.sorted(Comparator.comparing(V1ObjectMeta::getCreationTimestamp))
.map(V1ObjectMeta::getName)
.limit(propertyKubernetesMaxWritePods)
.anyMatch(localPodName::equals); 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||
logger.info("Error determining write pod eligibility:{}", e.getMessage(), e); | ||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
24kpure marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,14 +21,30 @@ | |
import io.kubernetes.client.openapi.apis.CoreV1Api; | ||
import io.kubernetes.client.openapi.models.V1ConfigMap; | ||
import io.kubernetes.client.openapi.models.V1ObjectMeta; | ||
import io.kubernetes.client.openapi.models.V1Pod; | ||
import io.kubernetes.client.openapi.models.V1PodList; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.mockito.Mockito; | ||
|
||
import java.time.OffsetDateTime; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import static org.mockito.Mockito.*; | ||
import static org.junit.Assert.*; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertNull; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.mockito.Mockito.any; | ||
import static org.mockito.Mockito.doReturn; | ||
import static org.mockito.Mockito.doThrow; | ||
import static org.mockito.Mockito.eq; | ||
import static org.mockito.Mockito.isNull; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class KubernetesManagerTest { | ||
|
||
|
@@ -38,7 +54,7 @@ public class KubernetesManagerTest { | |
@Before | ||
public void setUp() { | ||
coreV1Api = mock(CoreV1Api.class); | ||
kubernetesManager = new KubernetesManager(coreV1Api); | ||
kubernetesManager = new KubernetesManager(coreV1Api, "localPodName", 3); | ||
|
||
MockInjector.setInstance(KubernetesManager.class, kubernetesManager); | ||
MockInjector.setInstance(CoreV1Api.class, coreV1Api); | ||
|
@@ -58,13 +74,13 @@ public void testCreateConfigMapSuccess() throws Exception { | |
.metadata(new V1ObjectMeta().name(name).namespace(namespace)) | ||
.data(data); | ||
|
||
when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(),isNull())).thenReturn(configMap); | ||
when(coreV1Api.createNamespacedConfigMap(eq(namespace), eq(configMap), isNull(), isNull(), isNull(), isNull())).thenReturn(configMap); | ||
|
||
// act | ||
String result = kubernetesManager.createConfigMap(namespace, name, data); | ||
|
||
// assert | ||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull()); | ||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull()); | ||
assert name.equals(result); | ||
} | ||
|
||
|
@@ -82,7 +98,7 @@ public void testCreateConfigMapNullData() throws Exception { | |
String result = kubernetesManager.createConfigMap(namespace, name, data); | ||
|
||
// assert | ||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(),isNull()); | ||
verify(coreV1Api, times(1)).createNamespacedConfigMap(eq(namespace), any(V1ConfigMap.class), isNull(), isNull(), isNull(), isNull()); | ||
assert name.equals(result); | ||
} | ||
|
||
|
@@ -135,20 +151,40 @@ public void testUpdateConfigMapSuccess() throws Exception { | |
// arrange | ||
String namespace = "default"; | ||
String name = "testConfigMap"; | ||
Map<String, String> data = new HashMap<>(); | ||
data.put("key", "value"); | ||
|
||
V1Pod pod = new V1Pod() | ||
.metadata( | ||
new V1ObjectMeta() | ||
.name("localPodName") | ||
.creationTimestamp(OffsetDateTime.now()) | ||
.labels(Collections.singletonMap("app", "app"))); | ||
V1PodList v1PodList = new V1PodList().addItemsItem(new V1Pod().metadata(pod.getMetadata())); | ||
Comment on lines
+155
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance test coverage for pod scenarios Add test cases for different pod scenarios:
Would you like me to generate these additional test cases? |
||
|
||
Map<String, String> existData = new HashMap<>(); | ||
existData.put("key", "value"); | ||
V1ConfigMap configMap = new V1ConfigMap(); | ||
configMap.metadata(new V1ObjectMeta().name(name).namespace(namespace)); | ||
configMap.data(data); | ||
configMap.data(existData); | ||
|
||
when(coreV1Api.readNamespacedPod("localPodName", namespace, null)).thenReturn(pod); | ||
when(coreV1Api.listNamespacedPod(namespace, null, null, | ||
null, null, "app=app", | ||
null, null, null | ||
, null, null)).thenReturn(v1PodList); | ||
when(coreV1Api.readNamespacedConfigMap(name, namespace, null)).thenReturn(configMap); | ||
when(coreV1Api.replaceNamespacedConfigMap(name, namespace, configMap, null, null, null, null)).thenReturn(configMap); | ||
|
||
// act | ||
Boolean success = kubernetesManager.updateConfigMap(namespace, name, data); | ||
HashMap<String, String> updateData = new HashMap<>(existData); | ||
updateData.put("newKey","newValue"); | ||
boolean success = kubernetesManager.updateConfigMap(namespace, name, updateData); | ||
|
||
// assert | ||
assertTrue(success); | ||
Mockito.verify(coreV1Api, Mockito.times(1)).listNamespacedPod(namespace, null, null, | ||
null, null, "app=app", | ||
null, null, null | ||
, null, null); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reconsider return value for non-write pods
Returning
true
whenisWritePod
returnsfalse
might mask issues by indicating success when no update was attempted. Consider returning a different status or throwing an exception to make it clear that the pod is not eligible to write.