Skip to content

Commit

Permalink
Add overall timeout for requests. (fortanix#67)
Browse files Browse the repository at this point in the history
This deprecates timeout_read() and timeout_write() in favor of
timeout(). The new timeout method on Request takes a Duration instead
of a number of milliseconds, and is measured against overall request
time, not per-read time.

Once a request is started, the timeout is turned into a deadline
specific to that call. The deadline is used in conjunction with the
new DeadlineStream class, which sets a timeout on each read according
to the remaining time for the request. Once the request is done,
the DeadlineStream is unwrapped via .into::<Stream>() to become
an undecorated Stream again for return to the pool. Timeouts on the
stream are unset at this point.

Still to be done:

Add a setting on Agent for default timeout.
Change header-writing code to apply overall deadline rather than
per-write timeout.
Fixes fortanix#28.
  • Loading branch information
jsha authored Jun 21, 2020
1 parent d6b712f commit 57be414
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 42 deletions.
30 changes: 30 additions & 0 deletions src/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time;

use lazy_static::lazy_static;
use qstring::QString;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct Request {
pub(crate) timeout_connect: u64,
pub(crate) timeout_read: u64,
pub(crate) timeout_write: u64,
pub(crate) timeout: Option<time::Duration>,
pub(crate) redirects: u32,
pub(crate) proxy: Option<crate::proxy::Proxy>,
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -336,6 +338,8 @@ impl Request {
}

/// Timeout for the socket connection to be successful.
/// If both this and .timeout() are both set, .timeout_connect()
/// takes precedence.
///
/// The default is `0`, which means a request can block forever.
///
Expand All @@ -351,6 +355,8 @@ impl Request {
}

/// Timeout for the individual reads of the socket.
/// If both this and .timeout() are both set, .timeout()
/// takes precedence.
///
/// The default is `0`, which means it can block forever.
///
Expand All @@ -360,12 +366,15 @@ impl Request {
/// .call();
/// println!("{:?}", r);
/// ```
#[deprecated(note = "Please use the timeout() function instead")]
pub fn timeout_read(&mut self, millis: u64) -> &mut Request {
self.timeout_read = millis;
self
}

/// Timeout for the individual writes to the socket.
/// If both this and .timeout() are both set, .timeout()
/// takes precedence.
///
/// The default is `0`, which means it can block forever.
///
Expand All @@ -375,11 +384,32 @@ impl Request {
/// .call();
/// println!("{:?}", r);
/// ```
#[deprecated(note = "Please use the timeout() function instead")]
pub fn timeout_write(&mut self, millis: u64) -> &mut Request {
self.timeout_write = millis;
self
}

/// Timeout for the overall request, including DNS resolution, connection
/// time, redirects, and reading the response body. Slow DNS resolution
/// may cause a request to exceed the timeout, because the DNS request
/// cannot be interrupted with the available APIs.
///
/// This takes precedence over .timeout_read() and .timeout_write(), but
/// not .timeout_connect().
///
/// ```
/// // wait max 1 second for whole request to complete.
/// let r = ureq::get("/my_page")
/// .timeout(std::time::Duration::from_secs(1))
/// .call();
/// println!("{:?}", r);
/// ```
pub fn timeout(&mut self, timeout: time::Duration) -> &mut Request {
self.timeout = Some(timeout);
self
}

/// Basic auth.
///
/// These are the same
Expand Down
15 changes: 11 additions & 4 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::io::{Cursor, Error as IoError, ErrorKind, Read, Result as IoResult};
use std::str::FromStr;
use std::time::Instant;

use chunked_transfer::Decoder as ChunkDecoder;

use crate::error::Error;
use crate::header::Header;
use crate::pool::PoolReturnRead;
use crate::stream::Stream;
use crate::stream::{DeadlineStream, Stream};
use crate::unit::Unit;

#[cfg(feature = "json")]
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct Response {
headers: Vec<Header>,
unit: Option<Unit>,
stream: Option<Stream>,
deadline: Option<Instant>,
}

/// index into status_line where we split: HTTP/1.1 200 OK
Expand Down Expand Up @@ -273,7 +275,6 @@ impl Response {
/// ```
pub fn into_reader(self) -> impl Read {
//

let is_http10 = self.http_version().eq_ignore_ascii_case("HTTP/1.0");
let is_close = self
.header("connection")
Expand Down Expand Up @@ -306,6 +307,8 @@ impl Response {

let stream = self.stream.expect("No reader in response?!");
let unit = self.unit;
let deadline = unit.as_ref().and_then(|u| u.deadline);
let stream = DeadlineStream::new(stream, deadline);

match (use_chunked, limit_bytes) {
(true, _) => {
Expand Down Expand Up @@ -472,6 +475,7 @@ impl Response {
headers,
unit: None,
stream: None,
deadline: None,
})
}

Expand Down Expand Up @@ -551,6 +555,9 @@ impl Into<Response> for Error {
/// *Internal API*
pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option<Unit>, stream: Stream) {
resp.url = Some(url);
if let Some(unit) = &unit {
resp.deadline = unit.deadline;
}
resp.unit = unit;
resp.stream = Some(stream);
}
Expand Down Expand Up @@ -586,7 +593,7 @@ struct LimitedRead<R> {
position: usize,
}

impl<R> LimitedRead<R> {
impl<R: Read> LimitedRead<R> {
fn new(reader: R, limit: usize) -> Self {
LimitedRead {
reader,
Expand Down Expand Up @@ -617,7 +624,7 @@ impl<R: Read> Read for LimitedRead<R> {
}
}

impl<R> From<LimitedRead<R>> for Stream
impl<R: Read> From<LimitedRead<R>> for Stream
where
Stream: From<R>,
{
Expand Down
125 changes: 95 additions & 30 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,61 @@ pub enum Stream {
Test(Box<dyn Read + Send>, Vec<u8>),
}

// DeadlineStream wraps a stream such that read() will return an error
// after the provided deadline, and sets timeouts on the underlying
// TcpStream to ensure read() doesn't block beyond the deadline.
// When the From trait is used to turn a DeadlineStream back into a
// Stream (by PoolReturningRead), the timeouts are removed.
pub struct DeadlineStream {
stream: Stream,
deadline: Option<Instant>,
}

impl DeadlineStream {
pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self {
DeadlineStream { stream, deadline }
}
}

impl From<DeadlineStream> for Stream {
fn from(deadline_stream: DeadlineStream) -> Stream {
// Since we are turning this back into a regular, non-deadline Stream,
// remove any timeouts we set.
let stream = deadline_stream.stream;
if let Some(socket) = stream.socket() {
socket.set_read_timeout(None).unwrap();
socket.set_write_timeout(None).unwrap();
}
stream
}
}

impl Read for DeadlineStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
if let Some(deadline) = self.deadline {
let timeout = time_until_deadline(deadline)?;
if let Some(socket) = self.stream.socket() {
socket.set_read_timeout(Some(timeout))?;
socket.set_write_timeout(Some(timeout))?;
}
}
self.stream.read(buf)
}
}

// If the deadline is in the future, return the remaining time until
// then. Otherwise return a TimedOut error.
fn time_until_deadline(deadline: Instant) -> IoResult<Duration> {
let now = Instant::now();
match now.checked_duration_since(deadline) {
Some(_) => Err(IoError::new(
ErrorKind::TimedOut,
"timed out reading response",
)),
None => Ok(deadline - now),
}
}

impl ::std::fmt::Debug for Stream {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::result::Result<(), ::std::fmt::Error> {
write!(
Expand Down Expand Up @@ -77,10 +132,9 @@ impl Stream {
}
// Return true if the server has closed this connection.
pub(crate) fn server_closed(&self) -> IoResult<bool> {
match self {
Stream::Http(tcpstream) => Stream::serverclosed_stream(tcpstream),
Stream::Https(rustls_stream) => Stream::serverclosed_stream(&rustls_stream.sock),
_ => Ok(false),
match self.socket() {
Some(socket) => Stream::serverclosed_stream(socket),
None => Ok(false),
}
}
pub fn is_poolable(&self) -> bool {
Expand All @@ -95,6 +149,15 @@ impl Stream {
}
}

pub(crate) fn socket(&self) -> Option<&TcpStream> {
match self {
Stream::Http(tcpstream) => Some(tcpstream),
#[cfg(feature = "tls")]
Stream::Https(rustls_stream) => Some(&rustls_stream.sock),
_ => None,
}
}

#[cfg(test)]
pub fn to_write_vec(&self) -> Vec<u8> {
match self {
Expand Down Expand Up @@ -261,7 +324,13 @@ pub(crate) fn connect_https(unit: &Unit) -> Result<Stream, Error> {
}

pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
//
let deadline: Option<Instant> = if unit.timeout_connect > 0 {
Instant::now().checked_add(Duration::from_millis(unit.timeout_connect))
} else {
unit.deadline
};

// TODO: Find a way to apply deadline to DNS lookup.
let sock_addrs: Vec<SocketAddr> = match unit.proxy {
Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port),
None => format!("{}:{}", hostname, port),
Expand All @@ -282,34 +351,24 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp

let mut any_err = None;
let mut any_stream = None;
let mut timeout_connect = unit.timeout_connect;
let start_time = Instant::now();
let has_timeout = unit.timeout_connect > 0;

// Find the first sock_addr that accepts a connection
for sock_addr in sock_addrs {
// ensure connect timeout isn't hit overall.
if has_timeout {
let lapsed = (Instant::now() - start_time).as_millis() as u64;
if lapsed >= unit.timeout_connect {
any_err = Some(IoError::new(ErrorKind::TimedOut, "Didn't connect in time"));
break;
} else {
timeout_connect = unit.timeout_connect - lapsed;
}
}
// ensure connect timeout or overall timeout aren't yet hit.
let timeout = match deadline {
Some(deadline) => Some(time_until_deadline(deadline)?),
None => None,
};

// connect with a configured timeout.
let stream = if Some(Proto::SOCKS5) == proto {
connect_socks5(
unit.proxy.to_owned().unwrap(),
timeout_connect,
deadline,
sock_addr,
hostname,
port,
)
} else if has_timeout {
let timeout = Duration::from_millis(timeout_connect);
} else if let Some(timeout) = timeout {
TcpStream::connect_timeout(&sock_addr, timeout)
} else {
TcpStream::connect(&sock_addr)
Expand All @@ -332,15 +391,23 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp

// rust's absurd api returns Err if we set 0.
// Setting it to None will disable the native system timeout
if unit.timeout_read > 0 {
if let Some(deadline) = deadline {
stream
.set_read_timeout(Some(deadline - Instant::now()))
.ok();
} else if unit.timeout_read > 0 {
stream
.set_read_timeout(Some(Duration::from_millis(unit.timeout_read as u64)))
.ok();
} else {
stream.set_read_timeout(None).ok();
}

if unit.timeout_write > 0 {
if let Some(deadline) = deadline {
stream
.set_write_timeout(Some(deadline - Instant::now()))
.ok();
} else if unit.timeout_write > 0 {
stream
.set_write_timeout(Some(Duration::from_millis(unit.timeout_write as u64)))
.ok();
Expand Down Expand Up @@ -399,7 +466,7 @@ fn socks5_local_nslookup(hostname: &str, port: u16) -> Result<TargetAddr, std::i
#[cfg(feature = "socks-proxy")]
fn connect_socks5(
proxy: Proxy,
timeout_connect: u64,
deadline: Option<time::Instant>,
proxy_addr: SocketAddr,
host: &str,
port: u16,
Expand Down Expand Up @@ -430,7 +497,7 @@ fn connect_socks5(
// 1) In the event of a timeout, a thread may be left running in the background.
// TODO: explore supporting timeouts upstream in Socks5Proxy.
#[allow(clippy::mutex_atomic)]
let stream = if timeout_connect > 0 {
let stream = if let Some(deadline) = deadline {
use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
Expand All @@ -455,9 +522,7 @@ fn connect_socks5(
let (lock, cvar) = &*master_signal;
let done = lock.lock().unwrap();

let done_result = cvar
.wait_timeout(done, Duration::from_millis(timeout_connect))
.unwrap();
let done_result = cvar.wait_timeout(done, deadline - Instant::now()).unwrap();
let done = done_result.0;
if *done {
rx.recv().unwrap()?
Expand Down Expand Up @@ -504,7 +569,7 @@ fn get_socks5_stream(
#[cfg(not(feature = "socks-proxy"))]
fn connect_socks5(
_proxy: Proxy,
_timeout_connect: u64,
_deadline: Option<Instant>,
_proxy_addr: SocketAddr,
_hostname: &str,
_port: u16,
Expand Down
Loading

0 comments on commit 57be414

Please sign in to comment.