diff --git a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java index da6dd6cdd..356e5d7ba 100644 --- a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java @@ -31,6 +31,7 @@ import static org.apache.pekko.util.ByteString.emptyByteString; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; @@ -42,7 +43,6 @@ import java.util.function.Function; import org.apache.pekko.stream.javadsl.Framing; import org.apache.pekko.http.javadsl.model.*; -import scala.concurrent.duration.FiniteDuration; // #manual-entity-consume-example-1 // #single-request-in-actor-example @@ -106,7 +106,7 @@ public ExamplePerson parse(ByteString line) { // toStrict to enforce all data be loaded into memory from the connection final CompletionStage strictEntity = - response.entity().toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), system); + response.entity().toStrict(Duration.ofSeconds(3).toMillis(), system); // You can now use `getData` to get the data directly... final CompletionStage person1 = diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java index 01d6b2c3c..cb060968d 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java @@ -41,8 +41,8 @@ import org.junit.Ignore; import org.junit.Test; import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -1144,7 +1144,7 @@ public void testExtractDataBytes() { @Test public void testExtractStrictEntity() { // #extractStrictEntity - final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS); + final java.time.Duration timeout = java.time.Duration.ofSeconds(3); final Route route = extractStrictEntity(timeout, strict -> complete(strict.getData().utf8String())); @@ -1165,7 +1165,7 @@ public void testExtractStrictEntity() { @Test public void testToStrictEntity() { // #toStrictEntity - final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS); + final Duration timeout = Duration.ofSeconds(3); final Route route = toStrictEntity( timeout, diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/CachingDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/CachingDirectivesExamplesTest.java index fbc04733d..6b75c6437 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/CachingDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/CachingDirectivesExamplesTest.java @@ -26,8 +26,8 @@ // #caching-directives-import import static org.apache.pekko.http.javadsl.server.directives.CachingDirectives.*; // #caching-directives-import -import scala.concurrent.duration.Duration; // #time-unit-import +import java.time.Duration; import java.util.concurrent.TimeUnit; // #time-unit-import import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest; @@ -198,8 +198,8 @@ public Uri apply(RequestContext in, boolean isCheck) { .lfuCacheSettings() .withInitialCapacity(25) .withMaxCapacity(50) - .withTimeToLive(Duration.create(20, TimeUnit.SECONDS)) - .withTimeToIdle(Duration.create(10, TimeUnit.SECONDS)); + .withTimeToLive(Duration.ofSeconds(20)) + .withTimeToIdle(Duration.ofSeconds(10)); final CachingSettings cachingSettings = defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings); final Cache lfuCache = LfuCache.create(cachingSettings); diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java index 9b885dbe2..12698ad23 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/TimeoutDirectivesExamplesTest.java @@ -23,13 +23,14 @@ import org.apache.pekko.http.javadsl.server.AllDirectives; import org.apache.pekko.http.javadsl.server.Route; import org.apache.pekko.testkit.TestKit; +import org.apache.pekko.util.JavaDurationConverters; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.After; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.duration.Duration; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -84,13 +85,14 @@ private Optional runRoute(Route route, String routePath) throws Ex @After public void shutDown() { - TestKit.shutdownActorSystem(system, Duration.create(1, TimeUnit.SECONDS), false); + TestKit.shutdownActorSystem( + system, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS), false); } @Test public void testRequestTimeoutIsConfigurable() throws Exception { // #withRequestTimeout-plain - final Duration timeout = Duration.create(1, TimeUnit.SECONDS); + final Duration timeout = Duration.ofSeconds(1); CompletionStage slowFuture = new CompletableFuture<>(); final Route route = @@ -132,7 +134,7 @@ public void testRequestWithoutTimeoutCancelsTimeout() throws Exception { @Test public void testRequestTimeoutAllowsCustomResponse() throws Exception { // #withRequestTimeout-with-handler - final Duration timeout = Duration.create(1, TimeUnit.MILLISECONDS); + final Duration timeout = Duration.ofMillis(1); CompletionStage slowFuture = new CompletableFuture<>(); HttpResponse enhanceYourCalmResponse = @@ -162,7 +164,7 @@ public void testRequestTimeoutAllowsCustomResponse() throws Exception { @Test public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Exception { // #withRequestTimeoutResponse - final Duration timeout = Duration.create(100, TimeUnit.MILLISECONDS); + final Duration timeout = Duration.ofMillis(100); CompletionStage slowFuture = new CompletableFuture<>(); HttpResponse enhanceYourCalmResponse = @@ -193,8 +195,8 @@ public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Except @Test public void extractRequestTimeout() throws Exception { // #extractRequestTimeout - Duration timeout1 = Duration.create(500, TimeUnit.MILLISECONDS); - Duration timeout2 = Duration.create(1000, TimeUnit.MILLISECONDS); + Duration timeout1 = Duration.ofMillis(500); + Duration timeout2 = Duration.ofMillis(1000); Route route = path( "timeout", @@ -209,7 +211,12 @@ public void extractRequestTimeout() throws Exception { () -> extractRequestTimeout( t2 -> { - if (t1 == timeout1 && t2 == timeout2) + if (t1.equals( + JavaDurationConverters.asFiniteDuration( + timeout1)) + && t2.equals( + JavaDurationConverters.asFiniteDuration( + timeout2))) return complete(StatusCodes.OK); else return complete(StatusCodes.INTERNAL_SERVER_ERROR); diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java index 4ee92b21a..d74b5b2dd 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java @@ -30,8 +30,8 @@ import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -97,7 +97,7 @@ public void testHandleWebSocketMessages() { wsClient.expectMessage("Hello Peter!"); wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); - wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + wsClient.expectNoMessage(Duration.ofMillis(100)); wsClient.sendMessage("John"); wsClient.expectMessage("Hello John!"); diff --git a/http-caching/src/main/scala/org/apache/pekko/http/caching/javadsl/LfuCacheSettings.scala b/http-caching/src/main/scala/org/apache/pekko/http/caching/javadsl/LfuCacheSettings.scala index 70ef10d0d..cd229279e 100644 --- a/http-caching/src/main/scala/org/apache/pekko/http/caching/javadsl/LfuCacheSettings.scala +++ b/http-caching/src/main/scala/org/apache/pekko/http/caching/javadsl/LfuCacheSettings.scala @@ -17,6 +17,7 @@ import org.apache.pekko import pekko.annotation.DoNotInherit import pekko.http.caching.impl.settings.LfuCachingSettingsImpl import pekko.http.javadsl.settings.SettingsCompanion +import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration.Duration @@ -36,7 +37,21 @@ abstract class LfuCacheSettings private[http] () { self: LfuCachingSettingsImpl def withMaxCapacity(newMaxCapacity: Int): LfuCacheSettings = self.copy(maxCapacity = newMaxCapacity) def withInitialCapacity(newInitialCapacity: Int): LfuCacheSettings = self.copy(initialCapacity = newInitialCapacity) def withTimeToLive(newTimeToLive: Duration): LfuCacheSettings = self.copy(timeToLive = newTimeToLive) + + /** + * Java API + * @since 1.3.0 + */ + def withTimeToLive(newTimeToLive: java.time.Duration): LfuCacheSettings = + self.copy(timeToLive = newTimeToLive.asScala) def withTimeToIdle(newTimeToIdle: Duration): LfuCacheSettings = self.copy(timeToIdle = newTimeToIdle) + + /** + * Java API + * @since 1.3.0 + */ + def withTimeToIdle(newTimeToIdle: java.time.Duration): LfuCacheSettings = + self.copy(timeToIdle = newTimeToIdle.asScala) } object LfuCacheSettings extends SettingsCompanion[LfuCacheSettings] { diff --git a/http-core/src/main/java/org/apache/pekko/http/javadsl/TimeoutAccess.java b/http-core/src/main/java/org/apache/pekko/http/javadsl/TimeoutAccess.java index 1c3da3036..b51e540c0 100644 --- a/http-core/src/main/java/org/apache/pekko/http/javadsl/TimeoutAccess.java +++ b/http-core/src/main/java/org/apache/pekko/http/javadsl/TimeoutAccess.java @@ -17,7 +17,6 @@ import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; import org.apache.pekko.japi.Function; -import scala.concurrent.duration.Duration; /** * Enables programmatic access to the server-side request timeout logic. @@ -34,17 +33,31 @@ public interface TimeoutAccess { *

Due to the inherent raciness it is not guaranteed that the returned timeout was applied * before the previously set timeout has expired! */ - Duration getTimeout(); + scala.concurrent.duration.Duration getTimeout(); /** * Tries to set a new timeout. The timeout period is measured as of the point in time that the end - * of the request has been received, which may be in the past or in the future! Use `Duration.Inf` - * to completely disable request timeout checking for this request. + * of the request has been received, which may be in the past or in the future! Use + * `scala.concurrent.duration.Duration.Inf` to completely disable request timeout checking for + * this request. * *

Due to the inherent raciness it is not guaranteed that the update will be applied before the * previously set timeout has expired! */ - void updateTimeout(Duration timeout); + void updateTimeout(scala.concurrent.duration.Duration timeout); + + /** + * Tries to set a new timeout. The timeout period is measured as of the point in time that the end + * of the request has been received, which may be in the past or in the future! Use a + * `java.time.Duration` with a long duration to disable request timeout checking for this request + * (e.g. `java.time.temporal.ChronoUnit.FOREVER.getDuration()`). + * + *

Due to the inherent raciness it is not guaranteed that the update will be applied before the + * previously set timeout has expired! + * + * @since 1.3.0 + */ + void updateTimeout(java.time.Duration timeout); /** * Tries to set a new timeout handler, which produces the timeout response for a given request. @@ -61,5 +74,16 @@ public interface TimeoutAccess { *

Due to the inherent raciness it is not guaranteed that the update will be applied before the * previously set timeout has expired! */ - void update(Duration timeout, Function handler); + void update( + scala.concurrent.duration.Duration timeout, Function handler); + + /** + * Tries to set a new timeout and handler at the same time. + * + *

Due to the inherent raciness it is not guaranteed that the update will be applied before the + * previously set timeout has expired! + * + * @since 1.3.0 + */ + void update(java.time.Duration timeout, Function handler); } diff --git a/http-core/src/main/mima-filters/1.3.x.backwards.excludes/java-duration-support-javadsl.excludes b/http-core/src/main/mima-filters/1.3.x.backwards.excludes/java-duration-support-javadsl.excludes new file mode 100644 index 000000000..201a40d36 --- /dev/null +++ b/http-core/src/main/mima-filters/1.3.x.backwards.excludes/java-duration-support-javadsl.excludes @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add more support for java.time.Duration in Java DSL +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.updateTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.update") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.WebSocketSettings.getPeriodicKeepAliveMaxIdle") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withConnectingTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withIdleTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withStreamCancellationDelay") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withBaseConnectionBackoff") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionBackoff") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withIdleTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withKeepAliveTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionLifetime") diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index fae7f094f..3e8c07c7b 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -14,10 +14,7 @@ package org.apache.pekko.http.impl.engine.server import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.{ Future, Promise } -import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration } -import scala.collection.immutable -import scala.util.control.{ NoStackTrace, NonFatal } + import org.apache.pekko import pekko.NotUsed import pekko.actor.Cancellable @@ -27,6 +24,7 @@ import pekko.japi.Function import pekko.event.LoggingAdapter import pekko.http.ParsingErrorHandler import pekko.util.ByteString +import pekko.util.JavaDurationConverters._ import pekko.stream._ import pekko.stream.TLSProtocol._ import pekko.stream.scaladsl._ @@ -42,15 +40,19 @@ import pekko.http.impl.engine.rendering.{ ResponseRenderingOutput } import pekko.http.impl.util._ +import pekko.http.javadsl.model import pekko.http.scaladsl.util.FastFuture.EnhancedFuture import pekko.http.scaladsl.{ Http, TimeoutAccess } import pekko.http.scaladsl.model.headers.`Timeout-Access` -import pekko.http.javadsl.model import pekko.http.scaladsl.model._ import pekko.http.impl.util.LogByteStringTools._ import scala.annotation.nowarn +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration } +import scala.collection.immutable import scala.util.Failure +import scala.util.control.{ NoStackTrace, NonFatal } /** * INTERNAL API @@ -388,7 +390,8 @@ private[http] object HttpServerBluePrint { get.fast.foreach(setup => if (setup.scheduledTask ne null) setup.scheduledTask.cancel()) override def updateTimeout(timeout: Duration): Unit = update(timeout, null: HttpRequest => HttpResponse) - override def updateHandler(handler: HttpRequest => HttpResponse): Unit = update(null, handler) + override def updateHandler(handler: HttpRequest => HttpResponse): Unit = + update(null.asInstanceOf[Duration], handler) override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = { val promise = Promise[TimeoutSetup]() for (old <- getAndSet(promise.future).fast) @@ -411,9 +414,18 @@ private[http] object HttpServerBluePrint { import pekko.http.impl.util.JavaMapping.Implicits._ /** JAVA API * */ - def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit = + override def updateTimeout(timeout: java.time.Duration): Unit = { + val stimeout = if (timeout eq null) null else timeout.asScala + update(stimeout, null: HttpRequest => HttpResponse) + } + override def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit = update(timeout, handler(_: HttpRequest).asScala) - def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit = + override def update( + timeout: java.time.Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit = { + val stimeout = if (timeout eq null) null else timeout.asScala + update(stimeout, handler(_: HttpRequest).asScala) + } + override def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit = updateHandler(handler(_: HttpRequest).asScala) def timeout = currentTimeout diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ClientConnectionSettingsImpl.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ClientConnectionSettingsImpl.scala index d7e8602e3..3ea468f7f 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ClientConnectionSettingsImpl.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ClientConnectionSettingsImpl.scala @@ -25,6 +25,7 @@ import pekko.http.scaladsl.settings.ClientConnectionSettings.LogUnencryptedNetwo import pekko.http.scaladsl.settings.Http2ClientSettings.Http2ClientSettingsImpl import pekko.http.scaladsl.settings.{ Http2ClientSettings, ParserSettings, WebSocketSettings } import pekko.io.Inet.SocketOption +import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.collection.immutable @@ -55,6 +56,17 @@ private[pekko] final case class ClientConnectionSettingsImpl( "The provided ParserSettings is a generic object that does not contain the client-specific settings.") override def productPrefix = "ClientConnectionSettings" + override def withConnectingTimeout( + newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings = + withConnectingTimeout(newValue.asScala) + + override def withIdleTimeout(newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings = + withIdleTimeout(newValue.asScala) + + override def withStreamCancellationDelay( + newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings = + withStreamCancellationDelay(newValue.asScala) + override def websocketRandomFactory: () => Random = websocketSettings.randomFactory } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ConnectionPoolSettingsImpl.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ConnectionPoolSettingsImpl.scala index 7f09479d0..cf24a3bde 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ConnectionPoolSettingsImpl.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/ConnectionPoolSettingsImpl.scala @@ -17,6 +17,7 @@ import org.apache.pekko import pekko.annotation.InternalApi import pekko.http.impl.util._ import pekko.http.scaladsl.settings._ +import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.collection.immutable @@ -59,6 +60,21 @@ private[pekko] final case class ConnectionPoolSettingsImpl( override def productPrefix = "ConnectionPoolSettings" + override def withBaseConnectionBackoff(newValue: java.time.Duration): ConnectionPoolSettings = + withBaseConnectionBackoff(newValue.asScala) + + override def withMaxConnectionBackoff(newValue: java.time.Duration): ConnectionPoolSettings = + withMaxConnectionBackoff(newValue.asScala) + + override def withIdleTimeout(newValue: java.time.Duration): ConnectionPoolSettings = + withIdleTimeout(newValue.asScala) + + override def withKeepAliveTimeout(newValue: java.time.Duration): ConnectionPoolSettings = + withKeepAliveTimeout(newValue.asScala) + + override def withMaxConnectionLifetime(newValue: java.time.Duration): ConnectionPoolSettings = + withMaxConnectionLifetime(newValue.asScala) + def withUpdatedConnectionSettings( f: ClientConnectionSettings => ClientConnectionSettings): ConnectionPoolSettingsImpl = copy(connectionSettings = f(connectionSettings), diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/WebSocketSettingsImpl.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/WebSocketSettingsImpl.scala index 856e37fbe..35787d597 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/settings/WebSocketSettingsImpl.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/settings/WebSocketSettingsImpl.scala @@ -20,6 +20,7 @@ import pekko.annotation.InternalApi import pekko.http.impl.engine.ws.Randoms import pekko.http.impl.util._ import pekko.util.ByteString +import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration.Duration @@ -38,6 +39,8 @@ private[pekko] final case class WebSocketSettingsImpl( WebSocketSettingsImpl.KeepAliveModes contains periodicKeepAliveMode, s"Unsupported keep-alive mode detected! Was [$periodicKeepAliveMode], yet only: ${WebSocketSettingsImpl.KeepAliveModes} are supported.") + override def getPeriodicKeepAliveMaxIdle: java.time.Duration = + periodicKeepAliveMaxIdle.asJava override def productPrefix = "WebSocketSettings" } diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ClientConnectionSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ClientConnectionSettings.scala index 8c4eba6b9..e27809306 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ClientConnectionSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ClientConnectionSettings.scala @@ -22,12 +22,12 @@ import pekko.actor.ActorSystem import pekko.annotation.ApiMayChange import pekko.annotation.DoNotInherit import pekko.http.impl.settings.ClientConnectionSettingsImpl +import pekko.http.impl.util.JavaMapping.Implicits._ import pekko.http.javadsl.ClientTransport import pekko.http.javadsl.model.headers.UserAgent import pekko.io.Inet.SocketOption -import com.typesafe.config.Config -import pekko.http.impl.util.JavaMapping.Implicits._ import pekko.util.OptionConverters._ +import com.typesafe.config.Config import scala.collection.JavaConverters._ import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -66,6 +66,24 @@ abstract class ClientConnectionSettings private[pekko] () { self: ClientConnecti // Java API versions of mutators + /** + * Java API + * @since 1.3.0 + */ + def withConnectingTimeout(newValue: java.time.Duration): ClientConnectionSettings + + /** + * Java API + * @since 1.3.0 + */ + def withIdleTimeout(newValue: java.time.Duration): ClientConnectionSettings + + /** + * Java API + * @since 1.3.0 + */ + def withStreamCancellationDelay(newValue: java.time.Duration): ClientConnectionSettings + def withUserAgentHeader(newValue: Optional[UserAgent]): ClientConnectionSettings = self.copy(userAgentHeader = (newValue.asScala: Option[UserAgent]).map(_.asScala)) def withLogUnencryptedNetworkBytes(newValue: Optional[Int]): ClientConnectionSettings = diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ConnectionPoolSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ConnectionPoolSettings.scala index 4b125947f..2aa7a8c8a 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ConnectionPoolSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/ConnectionPoolSettings.scala @@ -69,10 +69,40 @@ abstract class ConnectionPoolSettings private[pekko] () { self: ConnectionPoolSe /** Client-side pipelining is not currently supported, see https://github.com/akka/akka-http/issues/32 */ def withPipeliningLimit(newValue: Int): ConnectionPoolSettings def withBaseConnectionBackoff(newValue: FiniteDuration): ConnectionPoolSettings + + /** + * Java API + * @since 1.3.0 + */ + def withBaseConnectionBackoff(newValue: java.time.Duration): ConnectionPoolSettings def withMaxConnectionBackoff(newValue: FiniteDuration): ConnectionPoolSettings + + /** + * Java API + * @since 1.3.0 + */ + def withMaxConnectionBackoff(newValue: java.time.Duration): ConnectionPoolSettings def withIdleTimeout(newValue: Duration): ConnectionPoolSettings + + /** + * Java API + * @since 1.3.0 + */ + def withIdleTimeout(newValue: java.time.Duration): ConnectionPoolSettings def withKeepAliveTimeout(newValue: Duration): ConnectionPoolSettings + + /** + * Java API + * @since 1.3.0 + */ + def withKeepAliveTimeout(newValue: java.time.Duration): ConnectionPoolSettings def withMaxConnectionLifetime(newValue: Duration): ConnectionPoolSettings + + /** + * Java API + * @since 1.3.0 + */ + def withMaxConnectionLifetime(newValue: java.time.Duration): ConnectionPoolSettings def withConnectionSettings(newValue: ClientConnectionSettings): ConnectionPoolSettings = self.copyDeep(_.withConnectionSettings(newValue.asScala), connectionSettings = newValue.asScala) diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ClientSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ClientSettings.scala index 7cacd0dd5..1ac4be45a 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ClientSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ClientSettings.scala @@ -16,7 +16,8 @@ package org.apache.pekko.http.javadsl.settings import java.time.Duration import org.apache.pekko.http.scaladsl -import scala.concurrent.duration._ + +import scala.concurrent.duration.DurationLong trait Http2ClientSettings { self: scaladsl.settings.Http2ClientSettings.Http2ClientSettingsImpl => def requestEntityChunkSize: Int diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala index a7539298e..d50942023 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/Http2ServerSettings.scala @@ -20,7 +20,8 @@ import pekko.annotation.DoNotInherit import pekko.http.scaladsl import pekko.util.ccompat.JavaConverters._ import com.typesafe.config.Config -import scala.concurrent.duration._ + +import scala.concurrent.duration.DurationLong @DoNotInherit trait Http2ServerSettings { diff --git a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/WebSocketSettings.scala b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/WebSocketSettings.scala index b28faab8c..454fe1f8e 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/WebSocketSettings.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/javadsl/settings/WebSocketSettings.scala @@ -13,6 +13,7 @@ package org.apache.pekko.http.javadsl.settings +import java.time.{ Duration => JDuration } import java.util.Random import java.util.function.Supplier @@ -21,6 +22,7 @@ import pekko.actor.ActorSystem import pekko.annotation.DoNotInherit import pekko.http.impl.settings.WebSocketSettingsImpl import pekko.util.ByteString +import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration.Duration @@ -34,6 +36,12 @@ trait WebSocketSettings { self: WebSocketSettingsImpl => def periodicKeepAliveMode: String def periodicKeepAliveMaxIdle: Duration + /** + * Java API + * @since 1.3.0 + */ + def getPeriodicKeepAliveMaxIdle: JDuration + /** * The provided supplier will be invoked for each new keep-alive frame that is sent. * The ByteString will be included in the Ping or Pong frame sent as heartbeat, @@ -47,6 +55,12 @@ trait WebSocketSettings { self: WebSocketSettingsImpl => copy(periodicKeepAliveMode = newValue) def withPeriodicKeepAliveMaxIdle(newValue: Duration): WebSocketSettings = copy(periodicKeepAliveMaxIdle = newValue) + + /** + * @since 1.3.0 + */ + def withPeriodicKeepAliveMaxIdle(newValue: JDuration): WebSocketSettings = + copy(periodicKeepAliveMaxIdle = newValue.asScala) def withPeriodicKeepAliveData(newValue: Supplier[ByteString]): WebSocketSettings = copy(periodicKeepAliveData = () => newValue.get()) diff --git a/http-core/src/test/scala/org/apache/pekko/http/javadsl/model/MultipartsSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/javadsl/model/MultipartsSpec.scala index c2d266ba0..fd8b1c10d 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/javadsl/model/MultipartsSpec.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/javadsl/model/MultipartsSpec.scala @@ -18,7 +18,7 @@ import java.util import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt import org.scalatest.{ BeforeAndAfterAll, Inside } import org.apache.pekko import pekko.actor.ActorSystem diff --git a/http-testkit/src/main/scala/org/apache/pekko/http/javadsl/testkit/WSProbe.scala b/http-testkit/src/main/scala/org/apache/pekko/http/javadsl/testkit/WSProbe.scala index ea77cc5aa..742876ecd 100644 --- a/http-testkit/src/main/scala/org/apache/pekko/http/javadsl/testkit/WSProbe.scala +++ b/http-testkit/src/main/scala/org/apache/pekko/http/javadsl/testkit/WSProbe.scala @@ -21,6 +21,7 @@ import pekko.stream.Materializer import pekko.stream.javadsl.Flow import pekko.stream.scaladsl import pekko.util.ByteString +import pekko.util.JavaDurationConverters._ import pekko.http.scaladsl.{ testkit => st } @@ -90,6 +91,12 @@ class WSProbe(delegate: st.WSProbe) { */ def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max) + /** + * Expect no message on the input side of the flow for the given maximum duration. + * @since 1.3.0 + */ + def expectNoMessage(max: java.time.Duration): Unit = delegate.expectNoMessage(max.asScala) + /** * Expect completion on the input side of the flow. */ diff --git a/http-tests/src/test/java/org/apache/pekko/http/javadsl/server/JavaTestServer.java b/http-tests/src/test/java/org/apache/pekko/http/javadsl/server/JavaTestServer.java index 457175fed..0497979f6 100644 --- a/http-tests/src/test/java/org/apache/pekko/http/javadsl/server/JavaTestServer.java +++ b/http-tests/src/test/java/org/apache/pekko/http/javadsl/server/JavaTestServer.java @@ -27,8 +27,8 @@ import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; -import scala.concurrent.duration.Duration; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.*; import java.util.function.Function; @@ -38,7 +38,7 @@ public class JavaTestServer { public Route createRoute() { - final Duration timeout = Duration.create(1, TimeUnit.SECONDS); + final Duration timeout = Duration.ofSeconds(1); final Route index = path( diff --git a/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/BasicDirectives.scala b/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/BasicDirectives.scala index 815152003..ba2525515 100644 --- a/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/BasicDirectives.scala +++ b/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/BasicDirectives.scala @@ -13,8 +13,6 @@ package org.apache.pekko.http.javadsl.server.directives -import java.util.function.{ Function => JFunction } - import org.apache.pekko import pekko.actor.ActorSystem import pekko.dispatch.ExecutionContexts @@ -24,27 +22,34 @@ import pekko.stream.Materializer import pekko.stream.javadsl.Source import pekko.util.ByteString import pekko.util.FutureConverters._ +import pekko.util.JavaDurationConverters._ import pekko.http.impl.model.JavaUri import pekko.http.impl.util.JavaMapping import pekko.http.impl.util.Util.convertIterable -import pekko.http.javadsl.model.{ HttpEntity, HttpRequest, RequestEntity, Uri } +import pekko.http.javadsl.model.{ + HttpEntity, + HttpHeader, + HttpRequest, + HttpResponse, + RequestEntity, + ResponseEntity, + Uri +} +import pekko.http.javadsl.server import pekko.http.javadsl.server._ import pekko.http.javadsl.settings.{ ParserSettings, RoutingSettings } import pekko.http.scaladsl import pekko.http.scaladsl.server.{ Directives => D } - -import pekko.http.javadsl.model.HttpResponse -import pekko.http.javadsl.model.ResponseEntity -import pekko.http.javadsl.model.HttpHeader import pekko.http.scaladsl.util.FastFuture._ -import pekko.http.javadsl.server import java.lang.{ Iterable => JIterable } -import java.util.function.Supplier +import java.time.{ Duration => JDuration } import java.util.{ List => JList } import java.util.concurrent.CompletionStage +import java.util.function.{ Function => JFunction } import java.util.function.Predicate +import java.util.function.Supplier import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration @@ -344,7 +349,10 @@ abstract class BasicDirectives { * entire request body within the timeout. * * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def extractStrictEntity(timeout: FiniteDuration, inner: JFunction[HttpEntity.Strict, Route]): Route = RouteAdapter { D.extractStrictEntity(timeout) { strict => inner.apply(strict).delegate } } @@ -360,12 +368,50 @@ abstract class BasicDirectives { * entire request body within the timeout. * * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @since 1.3.0 + */ + def extractStrictEntity(timeout: JDuration, inner: JFunction[HttpEntity.Strict, Route]): Route = RouteAdapter { + D.extractStrictEntity(timeout.asScala) { strict => inner.apply(strict).delegate } + } + + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `pekko.http.parsing.max-to-strict-bytes` configuration setting. + * + * Converts the HttpEntity from the [[pekko.http.javadsl.server.RequestContext]] into an + * [[pekko.http.javadsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def extractStrictEntity(timeout: FiniteDuration, maxBytes: Long, inner: JFunction[HttpEntity.Strict, Route]): Route = RouteAdapter { D.extractStrictEntity(timeout, maxBytes) { strict => inner.apply(strict).delegate } } + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `pekko.http.parsing.max-to-strict-bytes` configuration setting. + * + * Converts the HttpEntity from the [[pekko.http.javadsl.server.RequestContext]] into an + * [[pekko.http.javadsl.model.HttpEntity.Strict]] and extracts it, or fails the route if unable to drain the + * entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @since 1.3.0 + */ + def extractStrictEntity(timeout: JDuration, maxBytes: Long, inner: JFunction[HttpEntity.Strict, Route]): Route = + RouteAdapter { + D.extractStrictEntity(timeout.asScala, maxBytes) { strict => inner.apply(strict).delegate } + } + /** * WARNING: This will read the entire request entity into memory and effectively disable streaming. * @@ -376,7 +422,10 @@ abstract class BasicDirectives { * or fails the route if unable to drain the entire request body within the timeout. * * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def toStrictEntity(timeout: FiniteDuration, inner: Supplier[Route]): Route = RouteAdapter { D.toStrictEntity(timeout) { inner.get.delegate } } @@ -391,9 +440,43 @@ abstract class BasicDirectives { * or fails the route if unable to drain the entire request body within the timeout. * * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @since 1.3.0 */ + def toStrictEntity(timeout: JDuration, inner: Supplier[Route]): Route = RouteAdapter { + D.toStrictEntity(timeout.asScala) { inner.get.delegate } + } + + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `pekko.http.parsing.max-to-strict-bytes` configuration setting. + * + * Extracts the [[pekko.http.javadsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. + */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def toStrictEntity(timeout: FiniteDuration, maxBytes: Long, inner: Supplier[Route]): Route = RouteAdapter { D.toStrictEntity(timeout, maxBytes) { inner.get.delegate } } + /** + * WARNING: This will read the entire request entity into memory and effectively disable streaming. + * + * To help protect against excessive memory use, the request will be aborted if the request is larger + * than allowed by the `pekko.http.parsing.max-to-strict-bytes` configuration setting. + * + * Extracts the [[pekko.http.javadsl.server.RequestContext]] itself with the strict HTTP entity, + * or fails the route if unable to drain the entire request body within the timeout. + * + * @param timeout The directive is failed if the stream isn't completed after the given timeout. + * @since 1.3.0 + */ + def toStrictEntity(timeout: JDuration, maxBytes: Long, inner: Supplier[Route]): Route = RouteAdapter { + D.toStrictEntity(timeout.asScala, maxBytes) { inner.get.delegate } + } } diff --git a/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/TimeoutDirectives.scala b/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/TimeoutDirectives.scala index 314b8e337..492b4111d 100644 --- a/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/TimeoutDirectives.scala +++ b/http/src/main/scala/org/apache/pekko/http/javadsl/server/directives/TimeoutDirectives.scala @@ -13,6 +13,7 @@ package org.apache.pekko.http.javadsl.server.directives +import java.time.{ Duration => JDuration } import java.util.function.{ Function => JFunction, Supplier } import scala.concurrent.duration.Duration @@ -21,6 +22,7 @@ import org.apache.pekko import pekko.http.javadsl.model.{ HttpRequest, HttpResponse } import pekko.http.javadsl.server.Route import pekko.http.scaladsl.server.{ Directives => D } +import pekko.util.JavaDurationConverters._ import pekko.http.impl.util.JavaMapping.Implicits._ @@ -35,7 +37,10 @@ abstract class TimeoutDirectives extends WebSocketDirectives { * * Due to the inherent raciness it is not guaranteed that the update will be applied before * the previously set timeout has expired! + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def withRequestTimeout(timeout: scala.concurrent.duration.Duration, inner: Supplier[Route]): RouteAdapter = RouteAdapter { D.withRequestTimeout(timeout) { inner.get.delegate } @@ -46,13 +51,41 @@ abstract class TimeoutDirectives extends WebSocketDirectives { * * Due to the inherent raciness it is not guaranteed that the update will be applied before * the previously set timeout has expired! + * @since 1.3.0 */ + def withRequestTimeout(timeout: JDuration, inner: Supplier[Route]): RouteAdapter = + RouteAdapter { + D.withRequestTimeout(timeout.asScala) { inner.get.delegate } + } + + /** + * Tries to set a new request timeout and handler (if provided) at the same time. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + * @deprecated As of 1.3.0, use the overloaded method taking a `java.time.Duration` instead. + */ + @Deprecated + @deprecated("use the overloaded method taking a `java.time.Duration` instead.", "1.3.0") def withRequestTimeout(timeout: scala.concurrent.duration.Duration, timeoutHandler: JFunction[HttpRequest, HttpResponse], inner: Supplier[Route]): RouteAdapter = RouteAdapter { D.withRequestTimeout(timeout, in => timeoutHandler(in.asJava).asScala) { inner.get.delegate } } + /** + * Tries to set a new request timeout and handler (if provided) at the same time. + * + * Due to the inherent raciness it is not guaranteed that the update will be applied before + * the previously set timeout has expired! + * @since 1.3.0 + */ + def withRequestTimeout(timeout: JDuration, + timeoutHandler: JFunction[HttpRequest, HttpResponse], + inner: Supplier[Route]): RouteAdapter = RouteAdapter { + D.withRequestTimeout(timeout.asScala, in => timeoutHandler(in.asJava).asScala) { inner.get.delegate } + } + def withoutRequestTimeout(inner: Supplier[Route]): RouteAdapter = RouteAdapter { D.withoutRequestTimeout { inner.get.delegate } }