Skip to content

Commit

Permalink
Talk about Queue config
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Nov 21, 2018
1 parent 79e21b7 commit a891b1f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 58 deletions.
8 changes: 4 additions & 4 deletions examples/db_agent/lib/db_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ defmodule DBAgent do

@spec get(DBConnection.conn(), (state :: any -> value), timeout) :: value when value: var
def get(conn, fun, timeout \\ 5_000) do
DBConnection.execute!(conn, %Query{query: :get}, fun, pool_timeout: timeout)
DBConnection.execute!(conn, %Query{query: :get}, fun, timeout: timeout)
end

@spec update(DBConnection.conn(), (state :: any -> new_state :: any), timeout) :: :ok
def update(conn, fun, timeout \\ 5_000) do
DBConnection.execute!(conn, %Query{query: :update}, fun, pool_timeout: timeout)
DBConnection.execute!(conn, %Query{query: :update}, fun, timeout: timeout)
end

@spec get(DBConnection.conn(), (state :: any -> {value, new_state :: any}), timeout) :: value
when value: var
def get_and_update(conn, fun, timeout \\ 5_000) do
DBConnection.execute!(conn, %Query{query: :get_and_update}, fun, pool_timeout: timeout)
DBConnection.execute!(conn, %Query{query: :get_and_update}, fun, timeout: timeout)
end

@spec transaction(DBConnection.conn(), (DBConnection.t() -> res), timeout) ::
{:ok, res} | {:error, reason :: any}
when res: var
def transaction(conn, fun, timeout \\ 5_000) when is_function(fun, 1) do
DBConnection.transaction(conn, fun, pool_timeout: timeout)
DBConnection.transaction(conn, fun, timeout: timeout)
end

@spec rollback(DBConnection.t(), reason :: any) :: no_return
Expand Down
4 changes: 2 additions & 2 deletions integration_test/cases/queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule QueueTest do

run = fn() ->
try do
P.run(pool, fn(_) -> flunk("run ran") end, [pool_timeout: 50])
P.run(pool, fn(_) -> flunk("run ran") end, [timeout: 50])
rescue
DBConnection.ConnectionError ->
:error
Expand Down Expand Up @@ -134,7 +134,7 @@ defmodule QueueTest do
{:ok, agent} = A.start_link(stack)

opts = [agent: agent, parent: self(), backoff_start: 30_000,
queue_timeout: 10, pool_timeout: 10, queue_target: 10, queue_interval: 10]
queue_timeout: 10, queue_target: 10, queue_interval: 10]
{:ok, pool} = P.start_link(opts)

P.run(pool, fn(_) ->
Expand Down
99 changes: 53 additions & 46 deletions lib/db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ defmodule DBConnection do
The `DBConnection.Query` protocol provide utility functions so that
queries can be prepared or encoded and results decoding without
blocking the connection or pool.
## Queue config
Handling requests is done through a queue. When DBConnection is
started, there are two relevant options to control the queue:
* `:queue_target` in microseconds, defaults to 50
* `:queue_interval` in microseconds, defaults to 1000
Our goal is to stay under `:queue_target` for `:queue_interval`.
In case we can't reach that, then we double the :queue_target.
If we go above that, then we start dropping messages.
For example, by default our queue time is 50ms. If we stay above
50ms for a whole secnod, we double the target to 100ms and we
start dropping messages once it goes above the new limit.
This allows us to better plan for overloads as we can refuse
requests before they are sent to the database, which would
otherwise increase the burden on the database, making the
overload worse.
"""
require Logger

Expand Down Expand Up @@ -385,13 +406,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down Expand Up @@ -446,13 +465,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down Expand Up @@ -506,13 +523,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down Expand Up @@ -566,13 +581,11 @@ defmodule DBConnection do
## Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down Expand Up @@ -620,12 +633,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
The pool may support other options.
Expand Down Expand Up @@ -703,12 +715,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about begin, commit and rollback
calls made as part of the transaction, either a 1-arity fun,
`{module, function, args}` with `t:DBConnection.LogEntry.t/0` prepended to
Expand Down Expand Up @@ -843,13 +854,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down Expand Up @@ -879,13 +888,11 @@ defmodule DBConnection do
### Options
* `:pool_timeout` - The maximum time to wait for a reply when making a
synchronous call to the pool (default: `5_000`)
* `:queue` - Whether to block waiting in an internal queue for the
connection's state (boolean, default: `true`)
* `:timeout` - The maximum time that the caller is allowed the
to hold the connection's state (ignored when using a run/transaction
connection, default: `15_000`)
connection's state (boolean, default: `true`). See "Queue config" in
the module docs
* `:timeout` - The maximum time that the caller is allowed to perform
this operation (default: `15_000`)
* `:log` - A function to log information about a call, either
a 1-arity fun, `{module, function, args}` with `t:DBConnection.LogEntry.t/0`
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
Expand Down
5 changes: 2 additions & 3 deletions lib/db_connection/ownership.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ defmodule DBConnection.Ownership do
If the `:caller` option is given on checkout with a pid and no pool is
assigned to the current process, a connection will be allowed from the
given pid and used on checkout with `:pool_timeout` of `:infinity`.
This is useful when multiple tasks need to collaborate on the same
connection (hence the `:infinity` timeout).
given pid and used on checkout. This is useful when multiple tasks need
to collaborate on the same connection (hence the `:infinity` timeout).
"""

alias DBConnection.Ownership.Manager
Expand Down
6 changes: 3 additions & 3 deletions lib/db_connection/ownership/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule DBConnection.Ownership.Manager do
@spec checkin(GenServer.server, Keyword.t) ::
:ok | :not_owner | :not_found
def checkin(manager, opts) do
timeout = Keyword.get(opts, :pool_timeout, @timeout)
timeout = Keyword.get(opts, :timeout, @timeout)
GenServer.call(manager, :checkin, timeout)
end

Expand All @@ -31,14 +31,14 @@ defmodule DBConnection.Ownership.Manager do
def mode(manager, mode, opts)
when mode in [:auto, :manual]
when elem(mode, 0) == :shared and is_pid(elem(mode, 1)) do
timeout = Keyword.get(opts, :pool_timeout, @timeout)
timeout = Keyword.get(opts, :timeout, @timeout)
GenServer.call(manager, {:mode, mode}, timeout)
end

@spec allow(GenServer.server, parent :: pid, allow :: pid, Keyword.t) ::
:ok | {:already, :owner | :allowed} | :not_found
def allow(manager, parent, allow, opts) do
timeout = Keyword.get(opts, :pool_timeout, @timeout)
timeout = Keyword.get(opts, :timeout, @timeout)
GenServer.call(manager, {:allow, parent, allow}, timeout)
end

Expand Down

0 comments on commit a891b1f

Please sign in to comment.