Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OAuth Token for Kubernetes Authentication via Credential Service #1038

Merged
merged 12 commits into from
Oct 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -810,11 +810,11 @@ private void configureHttpApi(ServerBuilder sb,
apiV1ServiceBuilder
.annotatedService(new AdministrativeService(executor, statusManager))
.annotatedService(new ProjectServiceV1(projectApiManager, executor))
.annotatedService(new RepositoryServiceV1(executor, mds));
.annotatedService(new RepositoryServiceV1(executor, mds))
.annotatedService(new CredentialServiceV1(projectApiManager, executor));

if (GIT_MIRROR_ENABLED) {
apiV1ServiceBuilder.annotatedService(new MirroringServiceV1(projectApiManager, executor))
.annotatedService(new CredentialServiceV1(projectApiManager, executor));
apiV1ServiceBuilder.annotatedService(new MirroringServiceV1(projectApiManager, executor));
}

apiV1ServiceBuilder.annotatedService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public int hashCode() {
public final String toString() {
final ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.add("id", id);
helper.add("type", type);
helper.add("enabled", enabled);
addProperties(helper);
return helper.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public final class ControlPlaneService extends XdsResourceWatchingService {

void start(PluginInitContext pluginInitContext) {
init();
cache.setSnapshot(DEFAULT_GROUP, centralDogmaXdsResources.snapshot());
final CommandExecutor commandExecutor = pluginInitContext.commandExecutor();
final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(),
cache);
final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), cache);
final GrpcService grpcService = GrpcService.builder()
.addService(server.getClusterDiscoveryServiceImpl())
.addService(server.getEndpointDiscoveryServiceImpl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public XdsResourceManager(Project xdsProject, CommandExecutor commandExecutor) {
this.commandExecutor = requireNonNull(commandExecutor, "commandExecutor");
}

public Project xdsProject() {
return xdsProject;
}

public void checkGroup(String group) {
checkGroupId(group);
// TODO(minwoox): check the write permission.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -125,7 +126,8 @@ protected void handleXdsResource(String path, String contentAsText, String group
final ServiceEndpointWatcher endpointWatcher = watcherBuilder.build();
final String watcherName = endpointWatcher.getName();
logger.info("Creating a service endpoint watcher: {}", watcherName);
final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(endpointWatcher);
final CompletableFuture<KubernetesEndpointGroup> future =
createKubernetesEndpointGroup(endpointWatcher, xdsProject().metaRepo(), executorService);
final Map<String, KubernetesEndpointsUpdater> updaters =
kubernetesEndpointsUpdaters.computeIfAbsent(groupName, unused -> new HashMap<>());

Expand All @@ -134,15 +136,24 @@ protected void handleXdsResource(String path, String contentAsText, String group
oldUpdater.close();
}
final KubernetesEndpointsUpdater updater =
new KubernetesEndpointsUpdater(commandExecutor, kubernetesEndpointGroup, executorService,
new KubernetesEndpointsUpdater(commandExecutor, future, executorService,
groupName, watcherName, endpointWatcher.getClusterName());
updaters.put(watcherName, updater);
kubernetesEndpointGroup.addListener(endpoints -> {
if (endpoints.isEmpty()) {
return;
future.handle((kubernetesEndpointGroup, cause) -> {
if (cause != null) {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
// Do not remove the updater from updaters because it can remove the updater that is created
// by the next commit. The updater will be removed only when the file or group is removed.
updater.close();
return null;
}
executorService.execute(updater::maybeSchedule);
}, true);
kubernetesEndpointGroup.addListener(endpoints -> {
if (endpoints.isEmpty()) {
return;
}
executorService.execute(updater::maybeSchedule);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing kubernetesEndpointGroup as an argument to maybeSchedule to avoid calling kubernetesEndpointGroupFuture.join()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion. 👍

}, true);
return null;
});
}

@Override
Expand Down Expand Up @@ -200,7 +211,7 @@ protected boolean isStopped() {
private static class KubernetesEndpointsUpdater {

private final CommandExecutor commandExecutor;
private final KubernetesEndpointGroup kubernetesEndpointGroup;
private final CompletableFuture<KubernetesEndpointGroup> kubernetesEndpointGroupFuture;
private final ScheduledExecutorService executorService;
private final String groupName;
private final String watcherName;
Expand All @@ -209,11 +220,11 @@ private static class KubernetesEndpointsUpdater {
private ScheduledFuture<?> scheduledFuture;

KubernetesEndpointsUpdater(CommandExecutor commandExecutor,
KubernetesEndpointGroup kubernetesEndpointGroup,
CompletableFuture<KubernetesEndpointGroup> kubernetesEndpointGroupFuture,
ScheduledExecutorService executorService, String groupName,
String watcherName, String clusterName) {
this.commandExecutor = commandExecutor;
this.kubernetesEndpointGroup = kubernetesEndpointGroup;
this.kubernetesEndpointGroupFuture = kubernetesEndpointGroupFuture;
this.executorService = executorService;
this.groupName = groupName;
this.watcherName = watcherName;
Expand All @@ -228,14 +239,16 @@ void maybeSchedule() {
// instead of pushing one by one.
scheduledFuture = executorService.schedule(() -> {
scheduledFuture = null;
// maybeSchedule() is called after the future is completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be removed?

final KubernetesEndpointGroup kubernetesEndpointGroup = kubernetesEndpointGroupFuture.join();
if (kubernetesEndpointGroup.isClosing()) {
return;
}
pushK8sEndpoints();
pushK8sEndpoints(kubernetesEndpointGroup);
}, 1, TimeUnit.SECONDS);
}

private void pushK8sEndpoints() {
private void pushK8sEndpoints(KubernetesEndpointGroup kubernetesEndpointGroup) {
final List<com.linecorp.armeria.client.Endpoint> endpoints =
kubernetesEndpointGroup.endpoints();
if (endpoints.isEmpty()) {
Expand Down Expand Up @@ -298,7 +311,7 @@ void close() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
kubernetesEndpointGroup.closeAsync();
kubernetesEndpointGroupFuture.thenAccept(KubernetesEndpointGroup::closeAsync);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
Expand All @@ -36,17 +38,21 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroup;
import com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupBuilder;
import com.linecorp.armeria.common.ContextAwareBlockingTaskExecutor;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.centraldogma.common.Author;
import com.linecorp.centraldogma.common.EntryNotFoundException;
import com.linecorp.centraldogma.server.internal.credential.AccessTokenCredential;
import com.linecorp.centraldogma.server.storage.repository.MetaRepository;
import com.linecorp.centraldogma.xds.internal.XdsResourceManager;
import com.linecorp.centraldogma.xds.k8s.v1.XdsKubernetesServiceGrpc.XdsKubernetesServiceImplBase;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.netty.util.concurrent.ScheduledFuture;

/**
* A gRPC service that handles Kubernetes resources.
Expand Down Expand Up @@ -96,43 +102,57 @@ public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest req
"Create watcher: " + watcherName, watcher, author, true));
}

private static void validateWatcherAndPush(
private void validateWatcherAndPush(
StreamObserver<ServiceEndpointWatcher> responseObserver,
ServiceEndpointWatcher watcher, Runnable onSuccess) {
// Create a KubernetesEndpointGroup to check if the watcher is valid.
// We use KubernetesEndpointGroup for simplicity, but we will implement a custom implementation
// for better debugging and error handling in the future.
final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(watcher);

final AtomicBoolean completed = new AtomicBoolean();
final CompletableFuture<List<Endpoint>> whenReady = kubernetesEndpointGroup.whenReady();
final ServiceRequestContext ctx = ServiceRequestContext.current();

// Use a schedule to time out the watcher creation until we implement a custom implementation.
final ScheduledFuture<?> scheduledFuture = ctx.eventLoop().schedule(() -> {
if (!completed.compareAndSet(false, true)) {
return;
}
kubernetesEndpointGroup.closeAsync();
responseObserver.onError(
Status.INTERNAL.withDescription(
"Failed to retrieve k8s endpoints within 5 seconds. watcherName: " +
watcher.getName()).asRuntimeException());
}, 5, TimeUnit.SECONDS);

whenReady.handle((endpoints, cause) -> {
if (!completed.compareAndSet(false, true)) {
return null;
}
scheduledFuture.cancel(false);
kubernetesEndpointGroup.closeAsync();
final ContextAwareBlockingTaskExecutor taskExecutor =
ServiceRequestContext.current().blockingTaskExecutor();
final CompletableFuture<KubernetesEndpointGroup> future =
createKubernetesEndpointGroup(watcher, xdsResourceManager.xdsProject().metaRepo(),
taskExecutor);
future.handle((kubernetesEndpointGroup, cause) -> {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
if (cause != null) {
// Specific types.
responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException());
cause = Exceptions.peel(cause);
if (cause instanceof IllegalArgumentException || cause instanceof EntryNotFoundException) {
responseObserver.onError(Status.INVALID_ARGUMENT.withCause(cause).asRuntimeException());
} else {
responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException());
}
return null;
}
logger.debug("Successfully retrieved k8s endpoints: {}", endpoints);
onSuccess.run();
final AtomicBoolean completed = new AtomicBoolean();
final CompletableFuture<List<Endpoint>> whenReady = kubernetesEndpointGroup.whenReady();

// Use a schedule to time out the watcher creation until we implement a custom implementation.
final ScheduledFuture<?> scheduledFuture = taskExecutor.schedule(() -> {
if (!completed.compareAndSet(false, true)) {
return;
}
kubernetesEndpointGroup.closeAsync();
responseObserver.onError(
Status.INTERNAL.withDescription(
"Failed to retrieve k8s endpoints within 5 seconds. watcherName: " +
watcher.getName()).asRuntimeException());
}, 5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q) 5 seconds seems short compared to request timeouts. Why did you set 5 seconds as the deadline?

Copy link
Member Author

@minwoox minwoox Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose a value of less than 10 seconds, which is the request timeout.
This API has two phases: calling k8s and committing.
I estimated that the API would need at least 5 seconds for the committing phase so I allocated 5 seconds for the k8s.
I could potentially increase the default timeout and assign the larger value but I felt it wasn't necessary at this point.
This timeout will also be removed when I implement an API to fetch the information from k8s instead of using k8sEndpointGroup.
Do you have any recommendations?


whenReady.handle((endpoints, cause1) -> {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
if (!completed.compareAndSet(false, true)) {
return null;
}
scheduledFuture.cancel(false);
kubernetesEndpointGroup.closeAsync();
if (cause1 != null) {
// Specific types.
responseObserver.onError(Status.INTERNAL.withCause(cause1).asRuntimeException());
return null;
}
logger.debug("Successfully retrieved k8s endpoints: {}", endpoints);
onSuccess.run();
return null;
});
return null;
});
}
Expand All @@ -142,32 +162,51 @@ private static void validateWatcherAndPush(
* This method must be executed in a blocking thread because
* {@link KubernetesEndpointGroupBuilder#build()} blocks the execution thread.
*/
public static KubernetesEndpointGroup createKubernetesEndpointGroup(ServiceEndpointWatcher watcher) {
public static CompletableFuture<KubernetesEndpointGroup> createKubernetesEndpointGroup(
ServiceEndpointWatcher watcher, MetaRepository metaRepository, Executor executor) {
final KubernetesConfig kubernetesConfig = watcher.getKubernetesConfig();
final String serviceName = watcher.getServiceName();

final KubernetesEndpointGroupBuilder kubernetesEndpointGroupBuilder =
KubernetesEndpointGroup.builder(toConfig(kubernetesConfig)).serviceName(serviceName);
if (!isNullOrEmpty(kubernetesConfig.getNamespace())) {
kubernetesEndpointGroupBuilder.namespace(kubernetesConfig.getNamespace());
}
if (!isNullOrEmpty(watcher.getPortName())) {
kubernetesEndpointGroupBuilder.portName(watcher.getPortName());
}

return kubernetesEndpointGroupBuilder.build();
return toConfig(kubernetesConfig, metaRepository).thenApplyAsync(config -> {
final KubernetesEndpointGroupBuilder kubernetesEndpointGroupBuilder =
KubernetesEndpointGroup.builder(config).serviceName(serviceName);
if (!isNullOrEmpty(kubernetesConfig.getNamespace())) {
kubernetesEndpointGroupBuilder.namespace(kubernetesConfig.getNamespace());
}
if (!isNullOrEmpty(watcher.getPortName())) {
kubernetesEndpointGroupBuilder.portName(watcher.getPortName());
}
// This callback can be executed by an event loop from CachingRepository, so we should use the
// specified executor to avoid blocking the event loop below.
return kubernetesEndpointGroupBuilder.build();
}, executor);
}

private static Config toConfig(KubernetesConfig kubernetesConfig) {
private static CompletableFuture<Config> toConfig(KubernetesConfig kubernetesConfig,
MetaRepository metaRepository) {
final ConfigBuilder configBuilder = new ConfigBuilder()
.withMasterUrl(kubernetesConfig.getControlPlaneUrl())
.withTrustCerts(kubernetesConfig.getTrustCerts());

if (!isNullOrEmpty(kubernetesConfig.getOauthToken())) {
configBuilder.withOauthToken(kubernetesConfig.getOauthToken());
final String oauthToken = kubernetesConfig.getOauthToken();
if (isNullOrEmpty(oauthToken)) {
return CompletableFuture.completedFuture(configBuilder.build());
}

return configBuilder.build();
if (!oauthToken.startsWith("credential:")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question) Would it make more sense to enforce users to use credentials?

Since we're still free to make breaking changes, we could just rename oauthToken to credentialId or oauthCredentialId while we're at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea. 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. 😉

return CompletableFuture.completedFuture(configBuilder.withOauthToken(oauthToken).build());
}

return metaRepository.credential(oauthToken.substring("credential:".length()))
.thenApply(credential -> {
if (!(credential instanceof AccessTokenCredential)) {
throw new IllegalArgumentException(
"credential must be an access token: " + credential);
}

return configBuilder.withOauthToken(
((AccessTokenCredential) credential).accessToken()).build();
});
}

@Blocking
Expand Down
Loading
Loading