-
-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract inbound tcp and websocket connection flow to separate classes, unify retrying #773
Open
Vlatombe
wants to merge
19
commits into
jenkinsci:master
Choose a base branch
from
Vlatombe:endpoint-connector
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,137
−946
Open
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
fb1338a
Extract EndpointConnectors with 2 implementions
Vlatombe a255435
Spotless
Vlatombe 9f25128
Simplify
Vlatombe b4c9850
Extract JNLPEndpointResolver out of InboundTCPConnector
Vlatombe 02f8a57
Merge branch 'master' into endpoint-connector
Vlatombe 6a1ce9d
Move to JDK17
Vlatombe 70f312a
Add restrictions and @since as needed.
Vlatombe 1ecdd9e
Javadoc
Vlatombe 9ba77f7
Simplify noReconnectAfter handling
Vlatombe 7bc0d04
Attempt to give slighly more memory to tests
Vlatombe 55ef806
Spotbugs
Vlatombe 887773e
Fix delay
Vlatombe 21150c6
Eat stacktrace here as it is too noisy
Vlatombe 9b1ab2a
Extract next delay logic to a method
Vlatombe 0d7cb97
Simplify loop
Vlatombe 23276a1
RetryUtils is not public
Vlatombe c3a646c
Update pom.xml
timja 325f332
Update pom.xml
timja 04af69d
Update jvm.config
timja File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
src/main/java/org/jenkinsci/remoting/engine/EndpointConnector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package org.jenkinsci.remoting.engine; | ||
|
||
import edu.umd.cs.findbugs.annotations.CheckForNull; | ||
import edu.umd.cs.findbugs.annotations.Nullable; | ||
import hudson.remoting.Channel; | ||
import java.io.Closeable; | ||
import java.net.URL; | ||
import java.util.concurrent.Future; | ||
|
||
/** | ||
* Represents a connection to a remote endpoint to open a {@link Channel}. | ||
* @since TODO | ||
*/ | ||
public interface EndpointConnector extends Closeable { | ||
|
||
/** | ||
* @return a future to the channel to be established. Returns null if the connection cannot be established at all. | ||
* @throws Exception | ||
*/ | ||
@CheckForNull | ||
Future<Channel> connect() throws Exception; | ||
|
||
/** | ||
* Waits until the connection can be established. | ||
* @return true if the connection is ready, null if the connection never got ready | ||
* @throws InterruptedException if the thread is interrupted | ||
*/ | ||
@CheckForNull | ||
Boolean waitUntilReady() throws InterruptedException; | ||
|
||
/** | ||
* @return The name of the protocol used by this connection. | ||
*/ | ||
String getProtocol(); | ||
|
||
/** | ||
* @return the URL of the endpoint, if {@link #waitUntilReady()} returned {@code true}. | ||
*/ | ||
@Nullable | ||
URL getUrl(); | ||
} |
32 changes: 32 additions & 0 deletions
32
src/main/java/org/jenkinsci/remoting/engine/EndpointConnectorData.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package org.jenkinsci.remoting.engine; | ||
|
||
import hudson.remoting.EngineListenerSplitter; | ||
import hudson.remoting.JarCache; | ||
import java.security.cert.X509Certificate; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Captures the data needed to connect to any endpoint. | ||
* @param agentName the agent name | ||
* @param secretKey the secret key | ||
* @param executor the thread pool to use for handling TCP connections | ||
* @param events the listener to log to | ||
* @param noReconnectAfter Specifies the duration after which the connection should not be re-established. | ||
* @param candidateCertificates the list of certificates to be used for the connection | ||
* @param disableHttpsCertValidation whether to disable HTTPS certificate validation | ||
* @param jarCache Where to store the jar cache | ||
* @param proxyCredentials Credentials to use for proxy authentication, if any. | ||
* @since TODO | ||
*/ | ||
public record EndpointConnectorData( | ||
String agentName, | ||
String secretKey, | ||
ExecutorService executor, | ||
EngineListenerSplitter events, | ||
Duration noReconnectAfter, | ||
List<X509Certificate> candidateCertificates, | ||
boolean disableHttpsCertValidation, | ||
JarCache jarCache, | ||
String proxyCredentials) {} |
256 changes: 256 additions & 0 deletions
256
src/main/java/org/jenkinsci/remoting/engine/InboundTCPConnector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
package org.jenkinsci.remoting.engine; | ||
|
||
import edu.umd.cs.findbugs.annotations.CheckForNull; | ||
import edu.umd.cs.findbugs.annotations.NonNull; | ||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | ||
import hudson.remoting.Channel; | ||
import hudson.remoting.ChannelBuilder; | ||
import hudson.remoting.Engine; | ||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.Socket; | ||
import java.net.URL; | ||
import java.security.cert.X509Certificate; | ||
import java.security.interfaces.RSAPublicKey; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.Future; | ||
import java.util.logging.Logger; | ||
import java.util.stream.Collectors; | ||
import org.jenkinsci.remoting.protocol.IOHub; | ||
import org.jenkinsci.remoting.protocol.cert.DelegatingX509ExtendedTrustManager; | ||
import org.jenkinsci.remoting.protocol.cert.PublicKeyMatchingX509ExtendedTrustManager; | ||
import org.jenkinsci.remoting.protocol.impl.ConnectionRefusalException; | ||
import org.jenkinsci.remoting.util.KeyUtils; | ||
import org.jenkinsci.remoting.util.SSLUtils; | ||
import org.kohsuke.accmod.Restricted; | ||
import org.kohsuke.accmod.restrictions.NoExternalUse; | ||
|
||
/** | ||
* Connects to a controller using inbound TCP. | ||
*/ | ||
@Restricted(NoExternalUse.class) | ||
public class InboundTCPConnector implements EndpointConnector { | ||
private static final Logger LOGGER = Logger.getLogger(InboundTCPConnector.class.getName()); | ||
|
||
private final JnlpEndpointResolver jnlpEndpointResolver; | ||
private final List<URL> candidateUrls; | ||
private final DelegatingX509ExtendedTrustManager agentTrustManager; | ||
private final boolean keepAlive; | ||
private final EndpointConnectorData data; | ||
|
||
private URL url; | ||
/** | ||
* Name of the protocol that was used to successfully connect to the controller. | ||
*/ | ||
private String protocolName; | ||
|
||
/** | ||
* Tracks {@link Closeable} resources that need to be closed when this connector is closed. | ||
*/ | ||
@NonNull | ||
private final List<Closeable> closeables = new ArrayList<>(); | ||
|
||
@Override | ||
public URL getUrl() { | ||
return url; | ||
} | ||
|
||
@SuppressFBWarnings(value = "HARD_CODE_PASSWORD", justification = "Password doesn't need to be protected.") | ||
public InboundTCPConnector( | ||
EndpointConnectorData data, | ||
@NonNull List<URL> candidateUrls, | ||
@CheckForNull DelegatingX509ExtendedTrustManager agentTrustManager, | ||
boolean keepAlive, | ||
@NonNull JnlpEndpointResolver jnlpEndpointResolver) { | ||
this.data = data; | ||
this.candidateUrls = new ArrayList<>(candidateUrls); | ||
this.agentTrustManager = agentTrustManager; | ||
this.keepAlive = keepAlive; | ||
this.jnlpEndpointResolver = jnlpEndpointResolver; | ||
} | ||
|
||
private class EngineJnlpConnectionStateListener extends JnlpConnectionStateListener { | ||
|
||
private final RSAPublicKey publicKey; | ||
private final Map<String, String> headers; | ||
|
||
public EngineJnlpConnectionStateListener(RSAPublicKey publicKey, Map<String, String> headers) { | ||
this.publicKey = publicKey; | ||
this.headers = headers; | ||
} | ||
|
||
@Override | ||
public void beforeProperties(@NonNull JnlpConnectionState event) { | ||
if (event instanceof Jnlp4ConnectionState) { | ||
X509Certificate certificate = ((Jnlp4ConnectionState) event).getCertificate(); | ||
if (certificate != null) { | ||
String fingerprint = KeyUtils.fingerprint(certificate.getPublicKey()); | ||
if (!KeyUtils.equals(publicKey, certificate.getPublicKey())) { | ||
event.reject(new ConnectionRefusalException("Expecting identity " + fingerprint)); | ||
} | ||
data.events().status("Remote identity confirmed: " + fingerprint); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void afterProperties(@NonNull JnlpConnectionState event) { | ||
event.approve(); | ||
} | ||
|
||
@Override | ||
public void beforeChannel(@NonNull JnlpConnectionState event) { | ||
ChannelBuilder bldr = event.getChannelBuilder().withMode(Channel.Mode.BINARY); | ||
if (data.jarCache() != null) { | ||
bldr.withJarCache(data.jarCache()); | ||
} | ||
} | ||
|
||
@Override | ||
public void afterChannel(@NonNull JnlpConnectionState event) { | ||
// store the new cookie for next connection attempt | ||
String cookie = event.getProperty(JnlpConnectionState.COOKIE_KEY); | ||
if (cookie == null) { | ||
headers.remove(JnlpConnectionState.COOKIE_KEY); | ||
} else { | ||
headers.put(JnlpConnectionState.COOKIE_KEY, cookie); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public Future<Channel> connect() throws Exception { | ||
var hub = IOHub.create(data.executor()); | ||
closeables.add(hub); | ||
var context = SSLUtils.createSSLContext(agentTrustManager); | ||
|
||
final JnlpAgentEndpoint endpoint = RetryUtils.succeedsWithRetries( | ||
jnlpEndpointResolver::resolve, | ||
data.noReconnectAfter(), | ||
data.events(), | ||
x -> "Could not locate server among " + candidateUrls + ": " + x.getMessage()); | ||
if (endpoint == null) { | ||
data.events().status("Could not resolve server among " + this.candidateUrls); | ||
return null; | ||
} | ||
url = endpoint.getServiceUrl(); | ||
|
||
data.events() | ||
.status(String.format( | ||
"Agent discovery successful%n" | ||
+ " Agent address: %s%n" | ||
+ " Agent port: %d%n" | ||
+ " Identity: %s", | ||
endpoint.getHost(), endpoint.getPort(), KeyUtils.fingerprint(endpoint.getPublicKey()))); | ||
PublicKeyMatchingX509ExtendedTrustManager delegate = new PublicKeyMatchingX509ExtendedTrustManager(); | ||
RSAPublicKey publicKey = endpoint.getPublicKey(); | ||
if (publicKey != null) { | ||
// This is so that JNLP4-connect will only connect if the public key matches | ||
// if the public key is not published then JNLP4-connect will refuse to connect | ||
delegate.add(publicKey); | ||
} | ||
this.agentTrustManager.setDelegate(delegate); | ||
|
||
data.events().status("Handshaking"); | ||
// must be read-write | ||
final Map<String, String> headers = new HashMap<>(); | ||
headers.put(JnlpConnectionState.CLIENT_NAME_KEY, data.agentName()); | ||
headers.put(JnlpConnectionState.SECRET_KEY, data.secretKey()); | ||
// Create the protocols that will be attempted to connect to the controller. | ||
var clientProtocols = new JnlpProtocolHandlerFactory(data.executor()) | ||
.withIOHub(hub) | ||
.withSSLContext(context) | ||
.withPreferNonBlockingIO(false) // we only have one connection, prefer blocking I/O | ||
.handlers(); | ||
var negotiatedProtocols = clientProtocols.stream() | ||
.filter(JnlpProtocolHandler::isEnabled) | ||
.filter(p -> endpoint.isProtocolSupported(p.getName())) | ||
.collect(Collectors.toSet()); | ||
var serverProtocols = endpoint.getProtocols() == null ? "?" : String.join(",", endpoint.getProtocols()); | ||
LOGGER.info(buildDebugProtocolsMessage(serverProtocols, clientProtocols, negotiatedProtocols)); | ||
Comment on lines
+162
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gives clearer output to users on which protocols are supported on either side of the connection. |
||
for (var protocol : negotiatedProtocols) { | ||
var jnlpSocket = RetryUtils.succeedsWithRetries( | ||
() -> { | ||
data.events().status("Connecting to " + endpoint.describe() + " using " + protocol.getName()); | ||
// default is 30 mins. See PingThread for the ping interval | ||
final Socket s = endpoint.open(Engine.SOCKET_TIMEOUT); | ||
s.setKeepAlive(keepAlive); | ||
return s; | ||
}, | ||
data.noReconnectAfter(), | ||
data.events()); | ||
if (jnlpSocket == null) { | ||
return null; | ||
} | ||
closeables.add(jnlpSocket); | ||
try { | ||
protocolName = protocol.getName(); | ||
return protocol.connect( | ||
jnlpSocket, headers, new EngineJnlpConnectionStateListener(endpoint.getPublicKey(), headers)); | ||
} catch (IOException ioe) { | ||
data.events().status("Protocol " + protocol.getName() + " failed to establish channel", ioe); | ||
protocolName = null; | ||
} catch (RuntimeException e) { | ||
data.events().status("Protocol " + protocol.getName() + " encountered a runtime error", e); | ||
protocolName = null; | ||
} | ||
// On failure form a new connection. | ||
jnlpSocket.close(); | ||
closeables.remove(jnlpSocket); | ||
} | ||
if (negotiatedProtocols.isEmpty()) { | ||
data.events() | ||
.status( | ||
"reconnect rejected", | ||
new Exception("The server rejected the connection: None of the protocols were accepted")); | ||
} else { | ||
data.events() | ||
.status( | ||
"reconnect rejected", | ||
new Exception("The server rejected the connection: None of the protocols are enabled")); | ||
} | ||
return null; | ||
} | ||
|
||
@NonNull | ||
private static String buildDebugProtocolsMessage( | ||
String serverProtocols, | ||
List<JnlpProtocolHandler<? extends JnlpConnectionState>> clientProtocols, | ||
Set<JnlpProtocolHandler<? extends JnlpConnectionState>> negotiatedProtocols) { | ||
return "Protocols support: Server " + "[" + serverProtocols + "]" | ||
+ ", Client " + "[" | ||
+ clientProtocols.stream() | ||
.map(p -> p.getName() + (!p.isEnabled() ? " (disabled)" : "")) | ||
.collect(Collectors.joining(",")) | ||
+ "]" | ||
+ ", Negociated: " + "[" | ||
+ negotiatedProtocols.stream().map(JnlpProtocolHandler::getName).collect(Collectors.joining(",")) | ||
+ "]"; | ||
} | ||
|
||
@Override | ||
public Boolean waitUntilReady() throws InterruptedException { | ||
jnlpEndpointResolver.waitForReady(); | ||
return true; | ||
} | ||
|
||
@Override | ||
public String getProtocol() { | ||
return protocolName; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
closeables.forEach(c -> { | ||
try { | ||
c.close(); | ||
} catch (IOException e) { | ||
data.events().status("Failed to close resource " + c, e); | ||
} | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a
record
. (see #749)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too bad
JnlpConnectionStateListener
is an abstract class (for no good reason)