@@ -62,17 +62,13 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
62
62
ctx.setTimeout(asyncTimeoutMillis)
63
63
// Must be done on the container thread for Tomcat's sake when using async I/O.
64
64
val bodyWriter = servletIo.bodyWriter(servletResponse, dispatcher) _
65
- val result = F
66
- .attempt(
67
- toRequest(servletRequest) .fold(
68
- onParseFailure(_, servletResponse, bodyWriter ),
65
+ val result =
66
+ toRequest(servletRequest)
67
+ .fold(
68
+ onParseFailure(_, servletResponse),
69
69
handleRequest(ctx, _, bodyWriter),
70
70
)
71
- )
72
- .flatMap {
73
- case Right (()) => F .delay(ctx.complete())
74
- case Left (t) => errorHandler(servletRequest, servletResponse)(t)
75
- }
71
+ .recoverWith(errorHandler(servletRequest, servletResponse))
76
72
dispatcher.unsafeRunAndForget(result)
77
73
} catch errorHandler(servletRequest, servletResponse).andThen(dispatcher.unsafeRunSync _)
78
74
@@ -87,17 +83,23 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
87
83
// It is an error to add a listener to an async context that is
88
84
// already completed, so we must take care to add the listener
89
85
// before the response can complete.
90
-
91
86
val timeout =
92
- F .async[Response [ F ] ](cb =>
87
+ F .async[Unit ](cb =>
93
88
gate.complete(ctx.addListener(new AsyncTimeoutHandler (cb))).as(noopCancelToken)
94
89
)
95
90
val response =
96
91
gate.get *>
97
92
F .defer(serviceFn(request))
98
93
.recoverWith(serviceErrorHandler(request))
99
- val servletResponse = ctx.getResponse.asInstanceOf [HttpServletResponse ]
100
- F .race(timeout, response).flatMap(r => renderResponse(r.merge, servletResponse, bodyWriter))
94
+ F .race(timeout, response).flatMap {
95
+ case Left (_) =>
96
+ // In Jetty, if onTimeout is called, we need to complete on the
97
+ // listener's own thread.
98
+ F .unit
99
+ case Right (resp) =>
100
+ val servletResponse = ctx.getResponse.asInstanceOf [HttpServletResponse ]
101
+ renderResponse(resp, servletResponse, bodyWriter) *> F .delay(ctx.complete())
102
+ }
101
103
}
102
104
103
105
private def errorHandler (
@@ -124,11 +126,19 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
124
126
}
125
127
}
126
128
127
- private class AsyncTimeoutHandler (cb : Callback [Response [ F ] ]) extends AbstractAsyncListener {
129
+ private class AsyncTimeoutHandler (cb : Callback [Unit ]) extends AbstractAsyncListener {
128
130
override def onTimeout (event : AsyncEvent ): Unit = {
131
+ // In Jetty, we must complete on the same thread as the timeout
132
+ // handler. This triggers a cancellation of the service so we
133
+ // can take over.
134
+ cb(Right (()))
135
+
136
+ val ctx = event.getAsyncContext
129
137
val req = event.getAsyncContext.getRequest.asInstanceOf [HttpServletRequest ]
130
138
logger.info(s " Request timed out: ${req.getMethod} ${req.getServletPath}${req.getPathInfo}" )
131
- cb(Right (Response .timeout[F ]))
139
+ val resp = event.getAsyncContext.getResponse.asInstanceOf [HttpServletResponse ]
140
+ resp.sendError(Response .timeout.status.code, " Response timed out" )
141
+ ctx.complete()
132
142
}
133
143
}
134
144
}
0 commit comments