From a9f42fd727e5ac875cda0360a1bf9bae576e2cd8 Mon Sep 17 00:00:00 2001 From: echrist Date: Mon, 6 Oct 2014 12:59:48 -0700 Subject: [PATCH 1/6] Expose SO Timeout --- .../com/basho/riak/client/core/RiakNode.java | 53 +++++++++++++++++++ .../basho/riak/client/core/RiakNodeTest.java | 4 ++ 2 files changed, 57 insertions(+) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index 42c73a9f2..f62b2bcf3 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -84,6 +84,7 @@ public enum State private volatile int minConnections; private volatile long idleTimeoutInNanos; private volatile int connectionTimeout; + private volatile int soTimeout; private volatile boolean blockOnMaxConnections; private HealthCheckFactory healthCheckFactory; @@ -175,6 +176,7 @@ private RiakNode(Builder builder) throws UnknownHostException this.executor = builder.executor; this.connectionTimeout = builder.connectionTimeout; this.idleTimeoutInNanos = TimeUnit.NANOSECONDS.convert(builder.idleTimeout, TimeUnit.MILLISECONDS); + this.soTimeout = builder.soTimeout; this.minConnections = builder.minConnections; this.port = builder.port; this.remoteAddress = builder.remoteAddress; @@ -250,6 +252,11 @@ public synchronized RiakNode start() bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout); } + if (soTimeout > 0) + { + bootstrap.option(ChannelOption.SO_TIMEOUT, soTimeout); + } + if (minConnections > 0) { List minChannels = new LinkedList(); @@ -516,6 +523,32 @@ public int getConnectionTimeout() return connectionTimeout; } + /** + * Returns the SO timeout in milliseconds. + * + * @return the SOTimeout + * @see Builder#withSOTimeout(int) + */ + public RiakNode setSOTimeout(int soTimeoutInMillis) + { + stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); + this.soTimeout = soTimeoutInMillis; + bootstrap.option(ChannelOption.SO_TIMEOUT, connectionTimeout); + return this; + } + + /** + * Returns the SO timeout in milliseconds. + * + * @return the SOTimeout + * @see Builder#withSOTimeout(int) + */ + public int getSOTimeout() + { + stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); + return soTimeout; + } + /** * Returns the number of permits currently available. * The number of available permits indicates how many additional @@ -1199,6 +1232,12 @@ public static class Builder * @see #withConnectionTimeout(int) */ public final static int DEFAULT_CONNECTION_TIMEOUT = 0; + /** + * The default so timeout in milliseconds if not specified: {@value #DEFAULT_SO_TIMEOUT} + * + * @see #withSOTimeout(int) + */ + public final static int DEFAULT_SO_TIMEOUT = 0; /** * The default HealthCheckFactory. @@ -1216,6 +1255,7 @@ public static class Builder private int maxConnections = DEFAULT_MAX_CONNECTIONS; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int soTimeout = DEFAULT_SO_TIMEOUT; private HealthCheckFactory healthCheckFactory = DEFAULT_HEALTHCHECK_FACTORY; private Bootstrap bootstrap; private ScheduledExecutorService executor; @@ -1331,6 +1371,19 @@ public Builder withConnectionTimeout(int connectionTimeoutInMillis) return this; } + /** + * Set the SO timeout used when waiting for a response on the underlying sockets + * + * @param soTimeoutMillis + * @return this + * @see #DEFAULT_SO_TIMEOUT + */ + public Builder withSOTimeout(int soTimeoutMillis) + { + this.soTimeout = soTimeoutMillis; + return this; + } + /** * Provides an executor for this node to use for internal maintenance tasks. * If not provided one will be created via diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index cab00cbd6..0664d792a 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -63,6 +63,7 @@ public void builderProducesDefaultNode() throws UnknownHostException assertEquals(node.getMaxConnections(), Integer.MAX_VALUE); assertEquals(node.getConnectionTimeout(), RiakNode.Builder.DEFAULT_CONNECTION_TIMEOUT); assertEquals(node.getIdleTimeout(), RiakNode.Builder.DEFAULT_IDLE_TIMEOUT); + assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); assertEquals(node.getMinConnections(), RiakNode.Builder.DEFAULT_MIN_CONNECTIONS); assertEquals(node.availablePermits(), Integer.MAX_VALUE); } @@ -76,6 +77,7 @@ public void builderProducesCorrectNode() throws UnknownHostException final int MAX_CONNECTIONS = 2003; final int PORT = 2004; final int READ_TIMEOUT = 2005; + final int SO_TIMEOUT = 2006; final String REMOTE_ADDRESS = "localhost"; final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); final Bootstrap BOOTSTRAP = PowerMockito.spy(new Bootstrap()); @@ -91,6 +93,7 @@ public void builderProducesCorrectNode() throws UnknownHostException .withRemoteAddress(REMOTE_ADDRESS) .withExecutor(EXECUTOR) .withBootstrap(BOOTSTRAP) + .withSOTimeout(SO_TIMEOUT) .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -103,6 +106,7 @@ public void builderProducesCorrectNode() throws UnknownHostException assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); assertEquals(node.availablePermits(), MAX_CONNECTIONS); assertEquals(node.getPort(), PORT); + assertEquals(node.getSOTimeout(), SO_TIMEOUT); } From 05bc92e7115f2ffc11feeaa90d576ebc47f1767e Mon Sep 17 00:00:00 2001 From: echrist Date: Mon, 6 Oct 2014 13:18:56 -0700 Subject: [PATCH 2/6] Indentation fixes --- .../java/com/basho/riak/client/core/RiakNode.java | 11 ++++++----- .../java/com/basho/riak/client/core/RiakNodeTest.java | 9 ++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index f62b2bcf3..346f39d1e 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -176,7 +176,7 @@ private RiakNode(Builder builder) throws UnknownHostException this.executor = builder.executor; this.connectionTimeout = builder.connectionTimeout; this.idleTimeoutInNanos = TimeUnit.NANOSECONDS.convert(builder.idleTimeout, TimeUnit.MILLISECONDS); - this.soTimeout = builder.soTimeout; + this.soTimeout = builder.soTimeout; this.minConnections = builder.minConnections; this.port = builder.port; this.remoteAddress = builder.remoteAddress; @@ -524,15 +524,16 @@ public int getConnectionTimeout() } /** - * Returns the SO timeout in milliseconds. + * Sets the underlying socket SO (read) timeout in milliseconds. * - * @return the SOTimeout - * @see Builder#withSOTimeout(int) + * @param soTimeoutInMillis the SO (read) timeout to set + * @return a reference to this RiakNode + * @see Builder#withSOTimeout(int) */ public RiakNode setSOTimeout(int soTimeoutInMillis) { stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); - this.soTimeout = soTimeoutInMillis; + this.soTimeout = soTimeoutInMillis; bootstrap.option(ChannelOption.SO_TIMEOUT, connectionTimeout); return this; } diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 0664d792a..b9fb497c4 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -63,7 +63,7 @@ public void builderProducesDefaultNode() throws UnknownHostException assertEquals(node.getMaxConnections(), Integer.MAX_VALUE); assertEquals(node.getConnectionTimeout(), RiakNode.Builder.DEFAULT_CONNECTION_TIMEOUT); assertEquals(node.getIdleTimeout(), RiakNode.Builder.DEFAULT_IDLE_TIMEOUT); - assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); + assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); assertEquals(node.getMinConnections(), RiakNode.Builder.DEFAULT_MIN_CONNECTIONS); assertEquals(node.availablePermits(), Integer.MAX_VALUE); } @@ -76,8 +76,7 @@ public void builderProducesCorrectNode() throws UnknownHostException final int MIN_CONNECTIONS = 2002; final int MAX_CONNECTIONS = 2003; final int PORT = 2004; - final int READ_TIMEOUT = 2005; - final int SO_TIMEOUT = 2006; + final int SO_TIMEOUT = 2006; final String REMOTE_ADDRESS = "localhost"; final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); final Bootstrap BOOTSTRAP = PowerMockito.spy(new Bootstrap()); @@ -93,7 +92,7 @@ public void builderProducesCorrectNode() throws UnknownHostException .withRemoteAddress(REMOTE_ADDRESS) .withExecutor(EXECUTOR) .withBootstrap(BOOTSTRAP) - .withSOTimeout(SO_TIMEOUT) + .withSOTimeout(SO_TIMEOUT) .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -106,7 +105,7 @@ public void builderProducesCorrectNode() throws UnknownHostException assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); assertEquals(node.availablePermits(), MAX_CONNECTIONS); assertEquals(node.getPort(), PORT); - assertEquals(node.getSOTimeout(), SO_TIMEOUT); + assertEquals(node.getSOTimeout(), SO_TIMEOUT); } From eac3055cb826956a8703825f71c7051fc5cebcc9 Mon Sep 17 00:00:00 2001 From: echrist Date: Mon, 6 Oct 2014 14:15:43 -0700 Subject: [PATCH 3/6] More indentation fixes. --- .../com/basho/riak/client/core/RiakNode.java | 18 +++++++++--------- .../basho/riak/client/core/RiakNodeTest.java | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index 346f39d1e..a870ed75c 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -176,7 +176,7 @@ private RiakNode(Builder builder) throws UnknownHostException this.executor = builder.executor; this.connectionTimeout = builder.connectionTimeout; this.idleTimeoutInNanos = TimeUnit.NANOSECONDS.convert(builder.idleTimeout, TimeUnit.MILLISECONDS); - this.soTimeout = builder.soTimeout; + this.soTimeout = builder.soTimeout; this.minConnections = builder.minConnections; this.port = builder.port; this.remoteAddress = builder.remoteAddress; @@ -252,10 +252,10 @@ public synchronized RiakNode start() bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout); } - if (soTimeout > 0) - { - bootstrap.option(ChannelOption.SO_TIMEOUT, soTimeout); - } + if (soTimeout > 0) + { + bootstrap.option(ChannelOption.SO_TIMEOUT, soTimeout); + } if (minConnections > 0) { @@ -526,14 +526,14 @@ public int getConnectionTimeout() /** * Sets the underlying socket SO (read) timeout in milliseconds. * - * @param soTimeoutInMillis the SO (read) timeout to set - * @return a reference to this RiakNode - * @see Builder#withSOTimeout(int) + * @param soTimeoutInMillis the SO (read) timeout to set + * @return a reference to this RiakNode + * @see Builder#withSOTimeout(int) */ public RiakNode setSOTimeout(int soTimeoutInMillis) { stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); - this.soTimeout = soTimeoutInMillis; + this.soTimeout = soTimeoutInMillis; bootstrap.option(ChannelOption.SO_TIMEOUT, connectionTimeout); return this; } diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index b9fb497c4..a34c1523a 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -63,7 +63,7 @@ public void builderProducesDefaultNode() throws UnknownHostException assertEquals(node.getMaxConnections(), Integer.MAX_VALUE); assertEquals(node.getConnectionTimeout(), RiakNode.Builder.DEFAULT_CONNECTION_TIMEOUT); assertEquals(node.getIdleTimeout(), RiakNode.Builder.DEFAULT_IDLE_TIMEOUT); - assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); + assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); assertEquals(node.getMinConnections(), RiakNode.Builder.DEFAULT_MIN_CONNECTIONS); assertEquals(node.availablePermits(), Integer.MAX_VALUE); } @@ -76,7 +76,7 @@ public void builderProducesCorrectNode() throws UnknownHostException final int MIN_CONNECTIONS = 2002; final int MAX_CONNECTIONS = 2003; final int PORT = 2004; - final int SO_TIMEOUT = 2006; + final int SO_TIMEOUT = 2006; final String REMOTE_ADDRESS = "localhost"; final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); final Bootstrap BOOTSTRAP = PowerMockito.spy(new Bootstrap()); @@ -92,7 +92,7 @@ public void builderProducesCorrectNode() throws UnknownHostException .withRemoteAddress(REMOTE_ADDRESS) .withExecutor(EXECUTOR) .withBootstrap(BOOTSTRAP) - .withSOTimeout(SO_TIMEOUT) + .withSOTimeout(SO_TIMEOUT) .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -105,7 +105,7 @@ public void builderProducesCorrectNode() throws UnknownHostException assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); assertEquals(node.availablePermits(), MAX_CONNECTIONS); assertEquals(node.getPort(), PORT); - assertEquals(node.getSOTimeout(), SO_TIMEOUT); + assertEquals(node.getSOTimeout(), SO_TIMEOUT); } From a52bf6f9db5963829027c8ea5f36996c8ea49b02 Mon Sep 17 00:00:00 2001 From: echrist Date: Mon, 6 Oct 2014 18:44:54 -0700 Subject: [PATCH 4/6] Change to read timeout handler --- .../com/basho/riak/client/core/RiakNode.java | 54 +++++++++---------- .../core/netty/RiakChannelInitializer.java | 21 +++++++- .../riak/client/core/util/Constants.java | 1 + .../basho/riak/client/core/RiakNodeTest.java | 8 +-- 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index a870ed75c..35fc3e34f 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -76,6 +76,7 @@ public enum State private volatile Bootstrap bootstrap; private volatile boolean ownsBootstrap; + private volatile RiakChannelInitializer riakChannelInitializer; private volatile ScheduledExecutorService executor; private volatile boolean ownsExecutor; private volatile State state; @@ -84,7 +85,7 @@ public enum State private volatile int minConnections; private volatile long idleTimeoutInNanos; private volatile int connectionTimeout; - private volatile int soTimeout; + private volatile int readTimeout; private volatile boolean blockOnMaxConnections; private HealthCheckFactory healthCheckFactory; @@ -176,7 +177,7 @@ private RiakNode(Builder builder) throws UnknownHostException this.executor = builder.executor; this.connectionTimeout = builder.connectionTimeout; this.idleTimeoutInNanos = TimeUnit.NANOSECONDS.convert(builder.idleTimeout, TimeUnit.MILLISECONDS); - this.soTimeout = builder.soTimeout; + this.readTimeout = builder.readTimeout; this.minConnections = builder.minConnections; this.port = builder.port; this.remoteAddress = builder.remoteAddress; @@ -244,7 +245,9 @@ public synchronized RiakNode start() ownsBootstrap = true; } - bootstrap.handler(new RiakChannelInitializer(this)) + + riakChannelInitializer = new RiakChannelInitializer(this, readTimeout); + bootstrap.handler(riakChannelInitializer) .remoteAddress(new InetSocketAddress(remoteAddress, port)); if (connectionTimeout > 0) @@ -252,11 +255,6 @@ public synchronized RiakNode start() bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout); } - if (soTimeout > 0) - { - bootstrap.option(ChannelOption.SO_TIMEOUT, soTimeout); - } - if (minConnections > 0) { List minChannels = new LinkedList(); @@ -524,30 +522,30 @@ public int getConnectionTimeout() } /** - * Sets the underlying socket SO (read) timeout in milliseconds. + * Sets the read timeout in milliseconds. * - * @param soTimeoutInMillis the SO (read) timeout to set + * @param readTimeoutInMillis the read timeout to set * @return a reference to this RiakNode - * @see Builder#withSOTimeout(int) + * @see Builder#withReadTimeout(int) */ - public RiakNode setSOTimeout(int soTimeoutInMillis) + public RiakNode setReadTimeout(int readTimeoutInMillis) { stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); - this.soTimeout = soTimeoutInMillis; - bootstrap.option(ChannelOption.SO_TIMEOUT, connectionTimeout); + this.readTimeout = readTimeoutInMillis; + riakChannelInitializer.setReadTimeout(readTimeout); return this; } /** - * Returns the SO timeout in milliseconds. + * Returns the read timeout in milliseconds. * - * @return the SOTimeout - * @see Builder#withSOTimeout(int) + * @return the readTimeout + * @see Builder#withReadTimeout(int) */ - public int getSOTimeout() + public int getReadTimeout() { stateCheck(State.CREATED, State.RUNNING, State.HEALTH_CHECKING); - return soTimeout; + return readTimeout; } /** @@ -1234,11 +1232,11 @@ public static class Builder */ public final static int DEFAULT_CONNECTION_TIMEOUT = 0; /** - * The default so timeout in milliseconds if not specified: {@value #DEFAULT_SO_TIMEOUT} + * The default so timeout in milliseconds if not specified: {@value #DEFAULT_READ_TIMEOUT} * - * @see #withSOTimeout(int) + * @see #withReadTimeout(int) */ - public final static int DEFAULT_SO_TIMEOUT = 0; + public final static int DEFAULT_READ_TIMEOUT = 0; /** * The default HealthCheckFactory. @@ -1256,7 +1254,7 @@ public static class Builder private int maxConnections = DEFAULT_MAX_CONNECTIONS; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; - private int soTimeout = DEFAULT_SO_TIMEOUT; + private int readTimeout = DEFAULT_READ_TIMEOUT; private HealthCheckFactory healthCheckFactory = DEFAULT_HEALTHCHECK_FACTORY; private Bootstrap bootstrap; private ScheduledExecutorService executor; @@ -1373,15 +1371,15 @@ public Builder withConnectionTimeout(int connectionTimeoutInMillis) } /** - * Set the SO timeout used when waiting for a response on the underlying sockets + * Set the read timeout used when waiting for a response on the underlying sockets * - * @param soTimeoutMillis + * @param readTimeoutMillis * @return this - * @see #DEFAULT_SO_TIMEOUT + * @see #DEFAULT_READ_TIMEOUT */ - public Builder withSOTimeout(int soTimeoutMillis) + public Builder withReadTimeout(int readTimeoutMillis) { - this.soTimeout = soTimeoutMillis; + this.readTimeout = readTimeoutMillis; return this; } diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakChannelInitializer.java b/src/main/java/com/basho/riak/client/core/netty/RiakChannelInitializer.java index 728946699..6e26fcca2 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakChannelInitializer.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakChannelInitializer.java @@ -20,6 +20,10 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.util.concurrent.TimeUnit; /** * @@ -29,10 +33,13 @@ public class RiakChannelInitializer extends ChannelInitializer { private final RiakResponseListener listener; - public RiakChannelInitializer(RiakResponseListener listener) + private volatile int readTimeout; + + public RiakChannelInitializer(RiakResponseListener listener, int readTimeoutMillis) { super(); this.listener = listener; + this.readTimeout = readTimeoutMillis; } @Override @@ -42,6 +49,16 @@ public void initChannel(SocketChannel ch) throws Exception p.addLast(Constants.MESSAGE_CODEC, new RiakMessageCodec()); p.addLast(Constants.OPERATION_ENCODER, new RiakOperationEncoder()); p.addLast(Constants.RESPONSE_HANDLER, new RiakResponseHandler(listener)); + p.addLast(Constants.READ_TIMEOUT_HANDLER, new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)); + } + + public int getReadTimeout() + { + return readTimeout; + } + + public void setReadTimeout(int readTimeoutMillis) + { + readTimeout = readTimeoutMillis; } - } diff --git a/src/main/java/com/basho/riak/client/core/util/Constants.java b/src/main/java/com/basho/riak/client/core/util/Constants.java index 117fad736..b081afce7 100644 --- a/src/main/java/com/basho/riak/client/core/util/Constants.java +++ b/src/main/java/com/basho/riak/client/core/util/Constants.java @@ -95,6 +95,7 @@ public interface Constants { public static final String MESSAGE_CODEC = "codec"; public static final String OPERATION_ENCODER = "operationEncoder"; public static final String RESPONSE_HANDLER = "responseHandler"; + public static final String READ_TIMEOUT_HANDLER = "readTimeoutHandler"; public static final String SSL_HANDLER = "sslHandler"; public static final String HEALTHCHECK_CODEC = "healthCheckCodec"; diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index a34c1523a..cc4e494aa 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -63,7 +63,7 @@ public void builderProducesDefaultNode() throws UnknownHostException assertEquals(node.getMaxConnections(), Integer.MAX_VALUE); assertEquals(node.getConnectionTimeout(), RiakNode.Builder.DEFAULT_CONNECTION_TIMEOUT); assertEquals(node.getIdleTimeout(), RiakNode.Builder.DEFAULT_IDLE_TIMEOUT); - assertEquals(node.getSOTimeout(), RiakNode.Builder.DEFAULT_SO_TIMEOUT); + assertEquals(node.getReadTimeout(), RiakNode.Builder.DEFAULT_READ_TIMEOUT); assertEquals(node.getMinConnections(), RiakNode.Builder.DEFAULT_MIN_CONNECTIONS); assertEquals(node.availablePermits(), Integer.MAX_VALUE); } @@ -76,7 +76,7 @@ public void builderProducesCorrectNode() throws UnknownHostException final int MIN_CONNECTIONS = 2002; final int MAX_CONNECTIONS = 2003; final int PORT = 2004; - final int SO_TIMEOUT = 2006; + final int READ_TIMEOUT = 2006; final String REMOTE_ADDRESS = "localhost"; final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); final Bootstrap BOOTSTRAP = PowerMockito.spy(new Bootstrap()); @@ -92,7 +92,7 @@ public void builderProducesCorrectNode() throws UnknownHostException .withRemoteAddress(REMOTE_ADDRESS) .withExecutor(EXECUTOR) .withBootstrap(BOOTSTRAP) - .withSOTimeout(SO_TIMEOUT) + .withReadTimeout(READ_TIMEOUT) .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -105,7 +105,7 @@ public void builderProducesCorrectNode() throws UnknownHostException assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); assertEquals(node.availablePermits(), MAX_CONNECTIONS); assertEquals(node.getPort(), PORT); - assertEquals(node.getSOTimeout(), SO_TIMEOUT); + assertEquals(node.getReadTimeout(), READ_TIMEOUT); } From 4196e38b6a652eebbfd22c7dcd5cfbab5be9dafb Mon Sep 17 00:00:00 2001 From: "christopher.exell" Date: Mon, 6 Oct 2014 22:46:24 -0700 Subject: [PATCH 5/6] Add additional logging to track down ssl issues --- .../com/basho/riak/client/core/RiakNode.java | 7 +++++- .../core/netty/RiakSecurityDecoder.java | 22 +++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index 35fc3e34f..a2873faa9 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -695,6 +695,7 @@ private Channel doGetConnection() throws ConnectionFailedException try { + logger.debug("Waiting for new connection from channel future to {}:{}", remoteAddress, port); f.await(); } catch (InterruptedException ex) @@ -712,12 +713,15 @@ private Channel doGetConnection() throws ConnectionFailedException consecutiveFailedConnectionAttempts.incrementAndGet(); throw new ConnectionFailedException(f.cause()); } + + logger.debug("Connection to {}:{} successful", remoteAddress, port); consecutiveFailedConnectionAttempts.set(0); Channel c = f.channel(); if (trustStore != null) { + logger.debug("trustStore set starting TLS"); SSLContext context; try { @@ -752,11 +756,12 @@ else if (protocols.contains("TLSv1.1")) } engine.setUseClientMode(true); - RiakSecurityDecoder decoder = new RiakSecurityDecoder(engine, username, password); + RiakSecurityDecoder decoder = new RiakSecurityDecoder(remoteAddress, port, engine, username, password); c.pipeline().addFirst(decoder); try { + logger.debug("Waiting for authentication to complete with {}:{}", remoteAddress, port); DefaultPromise promise = decoder.getPromise(); promise.await(); diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java b/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java index 069900081..f33d02d1d 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java @@ -49,14 +49,18 @@ public class RiakSecurityDecoder extends ByteToMessageDecoder private final String username; private final String password; private final Logger logger = LoggerFactory.getLogger(RiakSecurityDecoder.class); - private volatile DefaultPromise promise; + private final String remoteAddr; + private final int remotePort; + private volatile DefaultPromise promise; private enum State { TLS_START, TLS_WAIT, SSL_WAIT, AUTH_WAIT } private volatile State state = State.TLS_START; - public RiakSecurityDecoder(SSLEngine engine, String username, String password) + public RiakSecurityDecoder(String remoteAddress, int port, SSLEngine engine, String username, String password) { + this.remoteAddr = remoteAddress; + this.remotePort = port; this.sslEngine = engine; this.username = username; this.password = password; @@ -88,7 +92,7 @@ protected void decode(ChannelHandlerContext chc, ByteBuf in, List out) t switch(code) { case RiakMessageCodes.MSG_StartTls: - logger.debug("Received MSG_RpbStartTls reply"); + logger.debug("Received MSG_RpbStartTls reply from {}:{}", remoteAddr, remotePort); // change state this.state = State.SSL_WAIT; // insert SSLHandler @@ -101,10 +105,11 @@ protected void decode(ChannelHandlerContext chc, ByteBuf in, List out) t chc.channel().pipeline().addFirst(Constants.SSL_HANDLER, sslHandler); break; case RiakMessageCodes.MSG_ErrorResp: - logger.debug("Received MSG_ErrorResp reply to startTls"); + logger.debug("Received MSG_ErrorResp reply to startTls from {}:{}", remoteAddr, remotePort); promise.tryFailure((riakErrorToException(protobuf))); break; default: + logger.debug("Invalid return code during StartTLS from {}:{}", remoteAddr, remotePort); promise.tryFailure(new RiakResponseException(0, "Invalid return code during StartTLS; " + code)); } @@ -114,21 +119,22 @@ protected void decode(ChannelHandlerContext chc, ByteBuf in, List out) t switch(code) { case RiakMessageCodes.MSG_AuthResp: - logger.debug("Received MSG_RpbAuthResp reply"); + logger.debug("Received MSG_RpbAuthResp reply from {}:{}", remoteAddr, remotePort); promise.trySuccess(null); break; case RiakMessageCodes.MSG_ErrorResp: - logger.debug("Received MSG_ErrorResp reply to auth"); + logger.debug("Received MSG_ErrorResp reply to Auth from {}:{}", remoteAddr, remotePort); promise.tryFailure(riakErrorToException(protobuf)); break; default: + logger.debug("Invalid return code during Auth from {}:{}", remoteAddr, remotePort); promise.tryFailure(new RiakResponseException(0, "Invalid return code during Auth; " + code)); } break; default: // WTF? - logger.error("Received message while not in TLS_WAIT or AUTH_WAIT"); + logger.error("Received message while not in TLS_WAIT or AUTH_WAIT from {}:{}", remoteAddr, remotePort); promise.tryFailure(new IllegalStateException("Received message while not in TLS_WAIT or AUTH_WAIT")); } } @@ -208,6 +214,7 @@ public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { + logger.debug("SSLHandshake Completed with {}:{}. Authenticating.", remoteAddr, remotePort); Channel c = future.getNow(); state = State.AUTH_WAIT; RiakPB.RpbAuthReq authReq = @@ -221,6 +228,7 @@ public void operationComplete(Future future) throws Exception } else { + logger.debug("SSLHandshake Failed with {}:{}.", remoteAddr, remotePort); promise.tryFailure(future.cause()); } } From deff37e967a8d92bb9712384883c7f4b9e54166b Mon Sep 17 00:00:00 2001 From: "christopher.exell" Date: Mon, 6 Oct 2014 23:49:40 -0700 Subject: [PATCH 6/6] More logging --- .../com/basho/riak/client/core/netty/RiakSecurityDecoder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java b/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java index f33d02d1d..3fa6306d2 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakSecurityDecoder.java @@ -109,7 +109,7 @@ protected void decode(ChannelHandlerContext chc, ByteBuf in, List out) t promise.tryFailure((riakErrorToException(protobuf))); break; default: - logger.debug("Invalid return code during StartTLS from {}:{}", remoteAddr, remotePort); + logger.debug("Invalid return code during StartTLS from {}:{} code", remoteAddr, remotePort, code); promise.tryFailure(new RiakResponseException(0, "Invalid return code during StartTLS; " + code)); } @@ -228,7 +228,7 @@ public void operationComplete(Future future) throws Exception } else { - logger.debug("SSLHandshake Failed with {}:{}.", remoteAddr, remotePort); + logger.warn("SSLHandshake Failed with {}:{}.", remoteAddr, remotePort, future.cause()); promise.tryFailure(future.cause()); } }