Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.pekko.util.ByteString.emptyByteString;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
Expand All @@ -42,7 +43,6 @@
import java.util.function.Function;
import org.apache.pekko.stream.javadsl.Framing;
import org.apache.pekko.http.javadsl.model.*;
import scala.concurrent.duration.FiniteDuration;
// #manual-entity-consume-example-1

// #single-request-in-actor-example
Expand Down Expand Up @@ -106,7 +106,7 @@ public ExamplePerson parse(ByteString line) {

// toStrict to enforce all data be loaded into memory from the connection
final CompletionStage<HttpEntity.Strict> strictEntity =
response.entity().toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), system);
response.entity().toStrict(Duration.ofSeconds(3).toMillis(), system);

// You can now use `getData` to get the data directly...
final CompletionStage<ExamplePerson> person1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -1144,7 +1144,7 @@ public void testExtractDataBytes() {
@Test
public void testExtractStrictEntity() {
// #extractStrictEntity
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
final java.time.Duration timeout = java.time.Duration.ofSeconds(3);
final Route route =
extractStrictEntity(timeout, strict -> complete(strict.getData().utf8String()));

Expand All @@ -1165,7 +1165,7 @@ public void testExtractStrictEntity() {
@Test
public void testToStrictEntity() {
// #toStrictEntity
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(3);
final Route route =
toStrictEntity(
timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
// #caching-directives-import
import static org.apache.pekko.http.javadsl.server.directives.CachingDirectives.*;
// #caching-directives-import
import scala.concurrent.duration.Duration;
// #time-unit-import
import java.time.Duration;
import java.util.concurrent.TimeUnit;
// #time-unit-import
import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest;
Expand Down Expand Up @@ -198,8 +198,8 @@ public Uri apply(RequestContext in, boolean isCheck) {
.lfuCacheSettings()
.withInitialCapacity(25)
.withMaxCapacity(50)
.withTimeToLive(Duration.create(20, TimeUnit.SECONDS))
.withTimeToIdle(Duration.create(10, TimeUnit.SECONDS));
.withTimeToLive(Duration.ofSeconds(20))
.withTimeToIdle(Duration.ofSeconds(10));
final CachingSettings cachingSettings =
defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings);
final Cache<Uri, RouteResult> lfuCache = LfuCache.create(cachingSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.testkit.TestKit;
import org.apache.pekko.util.JavaDurationConverters;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.duration.Duration;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -84,13 +85,14 @@ private Optional<HttpResponse> runRoute(Route route, String routePath) throws Ex

@After
public void shutDown() {
TestKit.shutdownActorSystem(system, Duration.create(1, TimeUnit.SECONDS), false);
TestKit.shutdownActorSystem(
system, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS), false);
}

@Test
public void testRequestTimeoutIsConfigurable() throws Exception {
// #withRequestTimeout-plain
final Duration timeout = Duration.create(1, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(1);
CompletionStage<String> slowFuture = new CompletableFuture<>();

final Route route =
Expand Down Expand Up @@ -132,7 +134,7 @@ public void testRequestWithoutTimeoutCancelsTimeout() throws Exception {
@Test
public void testRequestTimeoutAllowsCustomResponse() throws Exception {
// #withRequestTimeout-with-handler
final Duration timeout = Duration.create(1, TimeUnit.MILLISECONDS);
final Duration timeout = Duration.ofMillis(1);
CompletionStage<String> slowFuture = new CompletableFuture<>();

HttpResponse enhanceYourCalmResponse =
Expand Down Expand Up @@ -162,7 +164,7 @@ public void testRequestTimeoutAllowsCustomResponse() throws Exception {
@Test
public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Exception {
// #withRequestTimeoutResponse
final Duration timeout = Duration.create(100, TimeUnit.MILLISECONDS);
final Duration timeout = Duration.ofMillis(100);
CompletionStage<String> slowFuture = new CompletableFuture<>();

HttpResponse enhanceYourCalmResponse =
Expand Down Expand Up @@ -193,8 +195,8 @@ public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Except
@Test
public void extractRequestTimeout() throws Exception {
// #extractRequestTimeout
Duration timeout1 = Duration.create(500, TimeUnit.MILLISECONDS);
Duration timeout2 = Duration.create(1000, TimeUnit.MILLISECONDS);
Duration timeout1 = Duration.ofMillis(500);
Duration timeout2 = Duration.ofMillis(1000);
Route route =
path(
"timeout",
Expand All @@ -209,7 +211,12 @@ public void extractRequestTimeout() throws Exception {
() ->
extractRequestTimeout(
t2 -> {
if (t1 == timeout1 && t2 == timeout2)
if (t1.equals(
JavaDurationConverters.asFiniteDuration(
timeout1))
&& t2.equals(
JavaDurationConverters.asFiniteDuration(
timeout2)))
return complete(StatusCodes.OK);
else
return complete(StatusCodes.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testHandleWebSocketMessages() {
wsClient.expectMessage("Hello Peter!");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
wsClient.expectNoMessage(Duration.ofMillis(100));

wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.pekko
import pekko.annotation.DoNotInherit
import pekko.http.caching.impl.settings.LfuCachingSettingsImpl
import pekko.http.javadsl.settings.SettingsCompanion
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.concurrent.duration.Duration
Expand All @@ -36,7 +37,21 @@ abstract class LfuCacheSettings private[http] () { self: LfuCachingSettingsImpl
def withMaxCapacity(newMaxCapacity: Int): LfuCacheSettings = self.copy(maxCapacity = newMaxCapacity)
def withInitialCapacity(newInitialCapacity: Int): LfuCacheSettings = self.copy(initialCapacity = newInitialCapacity)
def withTimeToLive(newTimeToLive: Duration): LfuCacheSettings = self.copy(timeToLive = newTimeToLive)

/**
* Java API
* @since 1.3.0
*/
def withTimeToLive(newTimeToLive: java.time.Duration): LfuCacheSettings =
self.copy(timeToLive = newTimeToLive.asScala)
def withTimeToIdle(newTimeToIdle: Duration): LfuCacheSettings = self.copy(timeToIdle = newTimeToIdle)

/**
* Java API
* @since 1.3.0
*/
def withTimeToIdle(newTimeToIdle: java.time.Duration): LfuCacheSettings =
self.copy(timeToIdle = newTimeToIdle.asScala)
}

object LfuCacheSettings extends SettingsCompanion[LfuCacheSettings] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.japi.Function;
import scala.concurrent.duration.Duration;

/**
* Enables programmatic access to the server-side request timeout logic.
Expand All @@ -34,17 +33,31 @@ public interface TimeoutAccess {
* <p>Due to the inherent raciness it is not guaranteed that the returned timeout was applied
* before the previously set timeout has expired!
*/
Duration getTimeout();
scala.concurrent.duration.Duration getTimeout();

/**
* Tries to set a new timeout. The timeout period is measured as of the point in time that the end
* of the request has been received, which may be in the past or in the future! Use `Duration.Inf`
* to completely disable request timeout checking for this request.
* of the request has been received, which may be in the past or in the future! Use
* `scala.concurrent.duration.Duration.Inf` to completely disable request timeout checking for
* this request.
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*/
void updateTimeout(Duration timeout);
void updateTimeout(scala.concurrent.duration.Duration timeout);

/**
* Tries to set a new timeout. The timeout period is measured as of the point in time that the end
* of the request has been received, which may be in the past or in the future! Use a
* `java.time.Duration` with a long duration to disable request timeout checking for this request
* (e.g. `java.time.temporal.ChronoUnit.FOREVER.getDuration()`).
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*
* @since 1.3.0
*/
void updateTimeout(java.time.Duration timeout);

/**
* Tries to set a new timeout handler, which produces the timeout response for a given request.
Expand All @@ -61,5 +74,16 @@ public interface TimeoutAccess {
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*/
void update(Duration timeout, Function<HttpRequest, HttpResponse> handler);
void update(
scala.concurrent.duration.Duration timeout, Function<HttpRequest, HttpResponse> handler);

/**
* Tries to set a new timeout and handler at the same time.
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*
* @since 1.3.0
*/
void update(java.time.Duration timeout, Function<HttpRequest, HttpResponse> handler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add more support for java.time.Duration in Java DSL
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.updateTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.update")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.WebSocketSettings.getPeriodicKeepAliveMaxIdle")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withConnectingTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withIdleTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withStreamCancellationDelay")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withBaseConnectionBackoff")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionBackoff")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withIdleTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withKeepAliveTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionLifetime")
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package org.apache.pekko.http.impl.engine.server

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
import scala.collection.immutable
import scala.util.control.{ NoStackTrace, NonFatal }

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.Cancellable
Expand All @@ -27,6 +24,7 @@
import pekko.event.LoggingAdapter
import pekko.http.ParsingErrorHandler
import pekko.util.ByteString
import pekko.util.JavaDurationConverters._
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.stream.scaladsl._
Expand All @@ -42,15 +40,19 @@
ResponseRenderingOutput
}
import pekko.http.impl.util._
import pekko.http.javadsl.model
import pekko.http.scaladsl.util.FastFuture.EnhancedFuture
import pekko.http.scaladsl.{ Http, TimeoutAccess }
import pekko.http.scaladsl.model.headers.`Timeout-Access`
import pekko.http.javadsl.model
import pekko.http.scaladsl.model._
import pekko.http.impl.util.LogByteStringTools._

import scala.annotation.nowarn
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
import scala.collection.immutable
import scala.util.Failure
import scala.util.control.{ NoStackTrace, NonFatal }

/**
* INTERNAL API
Expand Down Expand Up @@ -388,7 +390,8 @@
get.fast.foreach(setup => if (setup.scheduledTask ne null) setup.scheduledTask.cancel())

override def updateTimeout(timeout: Duration): Unit = update(timeout, null: HttpRequest => HttpResponse)
override def updateHandler(handler: HttpRequest => HttpResponse): Unit = update(null, handler)
override def updateHandler(handler: HttpRequest => HttpResponse): Unit =
update(null.asInstanceOf[Duration], handler)
override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = {
val promise = Promise[TimeoutSetup]()
for (old <- getAndSet(promise.future).fast)
Expand All @@ -411,9 +414,18 @@
import pekko.http.impl.util.JavaMapping.Implicits._

/** JAVA API * */
def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
override def updateTimeout(timeout: java.time.Duration): Unit = {
val stimeout = if (timeout eq null) null else timeout.asScala
update(stimeout, null: HttpRequest => HttpResponse)
}
override def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
update(timeout, handler(_: HttpRequest).asScala)
def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
override def update(
timeout: java.time.Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit = {
val stimeout = if (timeout eq null) null else timeout.asScala
update(stimeout, handler(_: HttpRequest).asScala)
}
override def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
updateHandler(handler(_: HttpRequest).asScala)

def timeout = currentTimeout
Expand Down Expand Up @@ -721,7 +733,7 @@
})

private var activeTimers = 0
private def timeout = materializer.settings.subscriptionTimeoutSettings.timeout

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / validate-links

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / validate-links

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.12, 8)

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.12, 8)

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.12, 11)

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.12, 11)

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 8)

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 8)

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 11)

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 736 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 11)

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import pekko.http.scaladsl.settings.ClientConnectionSettings.LogUnencryptedNetwo
import pekko.http.scaladsl.settings.Http2ClientSettings.Http2ClientSettingsImpl
import pekko.http.scaladsl.settings.{ Http2ClientSettings, ParserSettings, WebSocketSettings }
import pekko.io.Inet.SocketOption
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.collection.immutable
Expand Down Expand Up @@ -55,6 +56,17 @@ private[pekko] final case class ClientConnectionSettingsImpl(
"The provided ParserSettings is a generic object that does not contain the client-specific settings.")
override def productPrefix = "ClientConnectionSettings"

override def withConnectingTimeout(
newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withConnectingTimeout(newValue.asScala)

override def withIdleTimeout(newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withIdleTimeout(newValue.asScala)

override def withStreamCancellationDelay(
newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withStreamCancellationDelay(newValue.asScala)

override def websocketRandomFactory: () => Random = websocketSettings.randomFactory
}

Expand Down
Loading
Loading