Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pekko.util.ByteString;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -107,7 +108,7 @@ public ExamplePerson parse(ByteString line) {

// toStrict to enforce all data be loaded into memory from the connection
final CompletionStage<HttpEntity.Strict> strictEntity =
response.entity().toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), system);
response.entity().toStrict(Duration.ofSeconds(3).toMillis(), system);

// You can now use `getData` to get the data directly...
final CompletionStage<ExamplePerson> person1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -1143,7 +1143,7 @@ public void testExtractDataBytes() {
@Test
public void testExtractStrictEntity() {
// #extractStrictEntity
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
final java.time.Duration timeout = java.time.Duration.ofSeconds(3);
final Route route =
extractStrictEntity(timeout, strict -> complete(strict.getData().utf8String()));

Expand All @@ -1164,7 +1164,7 @@ public void testExtractStrictEntity() {
@Test
public void testToStrictEntity() {
// #toStrictEntity
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(3);
final Route route =
toStrictEntity(
timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
// #caching-directives-import
import static org.apache.pekko.http.javadsl.server.directives.CachingDirectives.*;
// #caching-directives-import
import scala.concurrent.duration.Duration;
// #time-unit-import
import java.time.Duration;
import java.util.concurrent.TimeUnit;
// #time-unit-import
import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest;
Expand Down Expand Up @@ -198,8 +198,8 @@ public Uri apply(RequestContext in, boolean isCheck) {
.lfuCacheSettings()
.withInitialCapacity(25)
.withMaxCapacity(50)
.withTimeToLive(Duration.create(20, TimeUnit.SECONDS))
.withTimeToIdle(Duration.create(10, TimeUnit.SECONDS));
.withTimeToLive(Duration.ofSeconds(20))
.withTimeToIdle(Duration.ofSeconds(10));
final CachingSettings cachingSettings =
defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings);
final Cache<Uri, RouteResult> lfuCache = LfuCache.create(cachingSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.testkit.TestKit;
import org.apache.pekko.util.JavaDurationConverters;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.duration.Duration;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -84,13 +85,14 @@ private Optional<HttpResponse> runRoute(Route route, String routePath) throws Ex

@After
public void shutDown() {
TestKit.shutdownActorSystem(system, Duration.create(1, TimeUnit.SECONDS), false);
TestKit.shutdownActorSystem(
system, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS), false);
}

@Test
public void testRequestTimeoutIsConfigurable() throws Exception {
// #withRequestTimeout-plain
final Duration timeout = Duration.create(1, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(1);
CompletionStage<String> slowFuture = new CompletableFuture<>();

final Route route =
Expand Down Expand Up @@ -132,7 +134,7 @@ public void testRequestWithoutTimeoutCancelsTimeout() throws Exception {
@Test
public void testRequestTimeoutAllowsCustomResponse() throws Exception {
// #withRequestTimeout-with-handler
final Duration timeout = Duration.create(1, TimeUnit.MILLISECONDS);
final Duration timeout = Duration.ofMillis(1);
CompletionStage<String> slowFuture = new CompletableFuture<>();

HttpResponse enhanceYourCalmResponse =
Expand Down Expand Up @@ -162,7 +164,7 @@ public void testRequestTimeoutAllowsCustomResponse() throws Exception {
@Test
public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Exception {
// #withRequestTimeoutResponse
final Duration timeout = Duration.create(100, TimeUnit.MILLISECONDS);
final Duration timeout = Duration.ofMillis(100);
CompletionStage<String> slowFuture = new CompletableFuture<>();

HttpResponse enhanceYourCalmResponse =
Expand Down Expand Up @@ -193,8 +195,8 @@ public void testRequestTimeoutCustomResponseCanBeAddedSeparately() throws Except
@Test
public void extractRequestTimeout() throws Exception {
// #extractRequestTimeout
Duration timeout1 = Duration.create(500, TimeUnit.MILLISECONDS);
Duration timeout2 = Duration.create(1000, TimeUnit.MILLISECONDS);
Duration timeout1 = Duration.ofMillis(500);
Duration timeout2 = Duration.ofMillis(1000);
Route route =
path(
"timeout",
Expand All @@ -209,7 +211,12 @@ public void extractRequestTimeout() throws Exception {
() ->
extractRequestTimeout(
t2 -> {
if (t1 == timeout1 && t2 == timeout2)
if (t1.equals(
JavaDurationConverters.asFiniteDuration(
timeout1))
&& t2.equals(
JavaDurationConverters.asFiniteDuration(
timeout2)))
return complete(StatusCodes.OK);
else
return complete(StatusCodes.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void testHandleWebSocketMessages() {
wsClient.expectMessage("Hello Peter!");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
wsClient.expectNoMessage(Duration.ofMillis(100));

wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.pekko
import pekko.annotation.DoNotInherit
import pekko.http.caching.impl.settings.LfuCachingSettingsImpl
import pekko.http.javadsl.settings.SettingsCompanion
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.concurrent.duration.Duration
Expand All @@ -36,7 +37,21 @@ abstract class LfuCacheSettings private[http] () { self: LfuCachingSettingsImpl
def withMaxCapacity(newMaxCapacity: Int): LfuCacheSettings = self.copy(maxCapacity = newMaxCapacity)
def withInitialCapacity(newInitialCapacity: Int): LfuCacheSettings = self.copy(initialCapacity = newInitialCapacity)
def withTimeToLive(newTimeToLive: Duration): LfuCacheSettings = self.copy(timeToLive = newTimeToLive)

/**
* Java API
* @since 1.3.0
*/
def withTimeToLive(newTimeToLive: java.time.Duration): LfuCacheSettings =
self.copy(timeToLive = newTimeToLive.asScala)
def withTimeToIdle(newTimeToIdle: Duration): LfuCacheSettings = self.copy(timeToIdle = newTimeToIdle)

/**
* Java API
* @since 1.3.0
*/
def withTimeToIdle(newTimeToIdle: java.time.Duration): LfuCacheSettings =
self.copy(timeToIdle = newTimeToIdle.asScala)
}

object LfuCacheSettings extends SettingsCompanion[LfuCacheSettings] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.japi.function.Function;
import scala.concurrent.duration.Duration;

/**
* Enables programmatic access to the server-side request timeout logic.
Expand All @@ -34,17 +33,31 @@ public interface TimeoutAccess {
* <p>Due to the inherent raciness it is not guaranteed that the returned timeout was applied
* before the previously set timeout has expired!
*/
Duration getTimeout();
scala.concurrent.duration.Duration getTimeout();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be deprecated or at least add some doc about that it will be Java duration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to break this API in a later commit. This is just a forward fit of #780. For traceability, I want to add those changes in one PR and do another PR with the 2.0.0 breaking changes.
This API method isn't changing in this PR, I just fully referenced scala.concurrent.duration.Duration to make it more obvious that that was class that was being returned.
In my follow up PR, I will change the return type to java.time.Duration.


/**
* Tries to set a new timeout. The timeout period is measured as of the point in time that the end
* of the request has been received, which may be in the past or in the future! Use `Duration.Inf`
* to completely disable request timeout checking for this request.
* of the request has been received, which may be in the past or in the future! Use
* `scala.concurrent.duration.Duration.Inf` to completely disable request timeout checking for
* this request.
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*/
void updateTimeout(Duration timeout);
void updateTimeout(scala.concurrent.duration.Duration timeout);

/**
* Tries to set a new timeout. The timeout period is measured as of the point in time that the end
* of the request has been received, which may be in the past or in the future! Use a
* `java.time.Duration` with a long duration to disable request timeout checking for this request
* (e.g. `java.time.temporal.ChronoUnit.FOREVER.getDuration()`).
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*
* @since 1.3.0
*/
void updateTimeout(java.time.Duration timeout);

/**
* Tries to set a new timeout handler, which produces the timeout response for a given request.
Expand All @@ -61,5 +74,16 @@ public interface TimeoutAccess {
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*/
void update(Duration timeout, Function<HttpRequest, HttpResponse> handler);
void update(
scala.concurrent.duration.Duration timeout, Function<HttpRequest, HttpResponse> handler);

/**
* Tries to set a new timeout and handler at the same time.
*
* <p>Due to the inherent raciness it is not guaranteed that the update will be applied before the
* previously set timeout has expired!
*
* @since 1.3.0
*/
void update(java.time.Duration timeout, Function<HttpRequest, HttpResponse> handler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add more support for java.time.Duration in Java DSL
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.updateTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.TimeoutAccess.update")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.WebSocketSettings.getPeriodicKeepAliveMaxIdle")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withConnectingTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withIdleTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ClientConnectionSettings.withStreamCancellationDelay")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withBaseConnectionBackoff")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionBackoff")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withIdleTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withKeepAliveTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.javadsl.settings.ConnectionPoolSettings.withMaxConnectionLifetime")
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,20 @@
package org.apache.pekko.http.impl.engine.server

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
import scala.collection.immutable
import scala.util.control.{ NoStackTrace, NonFatal }

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.Cancellable
import pekko.annotation.InternalApi
import pekko.japi.function.Function
import pekko.event.LoggingAdapter
import pekko.http.ParsingErrorHandler
import pekko.util.ByteString
import pekko.util.JavaDurationConverters._
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.stream.scaladsl._
import pekko.stream.stage._
import pekko.http.ParsingErrorHandler
import pekko.http.scaladsl.settings.ServerSettings
import pekko.http.impl.engine.parsing.ParserOutput._
import pekko.http.impl.engine.parsing._
Expand All @@ -41,15 +39,18 @@
ResponseRenderingOutput
}
import pekko.http.impl.util._
import pekko.http.javadsl.model
import pekko.http.scaladsl.util.FastFuture.EnhancedFuture
import pekko.http.scaladsl.{ Http, TimeoutAccess }
import pekko.http.scaladsl.model.headers.`Timeout-Access`
import pekko.http.javadsl.model
import pekko.http.scaladsl.model._
import pekko.http.impl.util.LogByteStringTools._

import scala.concurrent.ExecutionContext
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
import scala.collection.immutable
import scala.util.Failure
import scala.util.control.{ NoStackTrace, NonFatal }

/**
* INTERNAL API
Expand Down Expand Up @@ -381,7 +382,8 @@
get.fast.foreach(setup => if (setup.scheduledTask ne null) setup.scheduledTask.cancel())

override def updateTimeout(timeout: Duration): Unit = update(timeout, null: HttpRequest => HttpResponse)
override def updateHandler(handler: HttpRequest => HttpResponse): Unit = update(null, handler)
override def updateHandler(handler: HttpRequest => HttpResponse): Unit =
update(null.asInstanceOf[Duration], handler)
override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = {
val promise = Promise[TimeoutSetup]()
for (old <- getAndSet(promise.future).fast)
Expand All @@ -404,9 +406,14 @@
import pekko.http.impl.util.JavaMapping.Implicits._

/** JAVA API * */
def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
override def updateTimeout(timeout: java.time.Duration): Unit =
update(timeout.asScala, null: HttpRequest => HttpResponse)
override def update(timeout: Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
update(timeout, handler(_: HttpRequest).asScala)
def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
override def update(
timeout: java.time.Duration, handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
update(timeout.asScala, handler(_: HttpRequest).asScala)
override def updateHandler(handler: Function[model.HttpRequest, model.HttpResponse]): Unit =
updateHandler(handler(_: HttpRequest).asScala)

def timeout = currentTimeout
Expand Down Expand Up @@ -714,7 +721,7 @@
})

private var activeTimers = 0
private def timeout = materializer.settings.subscriptionTimeoutSettings.timeout

Check warning on line 724 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / validate-links

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 724 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / validate-links

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value

Check warning on line 724 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 17)

method settings in class Materializer is deprecated (since Akka 2.6.0): Use attributes to access settings from stages

Check warning on line 724 in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala

View workflow job for this annotation

GitHub Actions / Compile and test (2.13, 17)

value timeout in class StreamSubscriptionTimeoutSettings is deprecated (since Akka 2.6.0): Use attribute 'ActorAttributes.StreamSubscriptionTimeout' to read the concrete setting value
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import pekko.http.scaladsl.settings.ClientConnectionSettings.LogUnencryptedNetwo
import pekko.http.scaladsl.settings.Http2ClientSettings.Http2ClientSettingsImpl
import pekko.http.scaladsl.settings.{ Http2ClientSettings, ParserSettings, WebSocketSettings }
import pekko.io.Inet.SocketOption
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.collection.immutable
Expand Down Expand Up @@ -55,6 +56,17 @@ private[pekko] final case class ClientConnectionSettingsImpl(
"The provided ParserSettings is a generic object that does not contain the client-specific settings.")
override def productPrefix = "ClientConnectionSettings"

override def withConnectingTimeout(
newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withConnectingTimeout(newValue.asScala)

override def withIdleTimeout(newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withIdleTimeout(newValue.asScala)

override def withStreamCancellationDelay(
newValue: java.time.Duration): pekko.http.scaladsl.settings.ClientConnectionSettings =
withStreamCancellationDelay(newValue.asScala)

override def websocketRandomFactory: () => Random = websocketSettings.randomFactory
}

Expand Down
Loading
Loading