Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection_timeout_ms and reset the timeout counter more often #2388

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
85f91c6
Add connection_timeout_ms and reset the timeout counter more often
petterroea Aug 9, 2023
0d6d70e
Refactor last_attempt -> last_activity
petterroea Aug 14, 2023
015268d
Make tests work again
petterroea Aug 14, 2023
6876f68
Add unit tests of new BrokerConnection functionality
petterroea Aug 14, 2023
91581ed
Re-introduce last_attempt to avoid breakage
petterroea Nov 24, 2023
b95e46d
Rename project from kafka-python to kafka-python-ng (#1)
wbarnha Mar 7, 2024
78c74c0
Fix artifact downloads for release
wbarnha Mar 7, 2024
e796019
Fix badge links in README.rst
wbarnha Mar 8, 2024
38e159a
Reconfigure tests to complete in a more timely manner and skip some i…
wbarnha Mar 8, 2024
e762321
Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161)
wbarnha Mar 9, 2024
00750aa
Remove support for EOL'ed versions of Python (#160)
wbarnha Mar 9, 2024
5bd1323
Stop testing Python 3.13 in python-package.yml (#162)
wbarnha Mar 9, 2024
cda8f81
Avoid 100% CPU usage while socket is closed (#156)
wbarnha Mar 9, 2024
c02df08
Fix DescribeConfigsResponse_v1 config_source (#150)
wbarnha Mar 9, 2024
65eacfb
Fix base class of DescribeClientQuotasResponse_v0 (#144)
wbarnha Mar 10, 2024
e0ebe5d
Update license_file to license_files (#131)
wbarnha Mar 10, 2024
26bb3eb
Update some RST documentation syntax (#130)
wbarnha Mar 10, 2024
2e6649c
Merge branch 'master' into master
wbarnha Mar 10, 2024
88763da
Fix crc32c's __main__ for Python 3 (#142)
wbarnha Mar 10, 2024
b1a4c53
Strip trailing dot off hostname. (#133)
wbarnha Mar 10, 2024
18eaa2d
Handle OSError to properly recycle SSL connection, fix infinite loop …
wbarnha Mar 10, 2024
54cbd63
client_async: Allow throwing an exception upon socket error during (#…
wbarnha Mar 10, 2024
eb6fd9b
Log connection errors at ERROR level (#139)
wbarnha Mar 12, 2024
6ad79a4
Support custom SASL mechanisms including AWS MSK (#170)
wbarnha Mar 18, 2024
deeccfa
Update python-package.yml to have 15m as timeout
wbarnha Mar 18, 2024
fcca556
Run pyupgrade on everything. (#171)
wbarnha Mar 18, 2024
a856dc4
Remove all vendoring (#169)
s-t-e-v-e-n-k Mar 19, 2024
2f2ccb1
Support Describe log dirs (#145)
wbarnha Mar 19, 2024
0259502
Update conftest.py to use request.node.originalname instead for legal…
wbarnha Mar 20, 2024
3c124b2
KIP-345 Static membership implementation (#137)
wbarnha Mar 20, 2024
56065da
Use monkeytype to create some semblance of typing (#173)
wbarnha Mar 26, 2024
cbf317b
Add zstd support on legacy record and ensure no variable is referred …
wbarnha Mar 26, 2024
d34ad3c
Merge branch 'master' into master
wbarnha Mar 26, 2024
af1a5f0
Update __init__.py of SASL to catch ImportErrors in case botocore is …
wbarnha Mar 27, 2024
aba153f
Add botocore to extras in setup.py
wbarnha Mar 27, 2024
a9e30b0
Merge branch 'master' into master
wbarnha Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class KafkaClient(object):
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -160,6 +163,7 @@ class KafkaClient(object):
'bootstrap_servers': 'localhost',
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
Expand Down
24 changes: 19 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class BrokerConnection(object):
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -197,6 +200,7 @@ class BrokerConnection(object):
'client_id': 'kafka-python-' + __version__,
'node_id': 0,
'request_timeout_ms': 30000,
'connection_timeout_ms': None,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
Expand Down Expand Up @@ -241,6 +245,9 @@ def __init__(self, host, port, afi, **configs):
for key in self.config:
if key in configs:
self.config[key] = configs[key]

if self.config['connection_timeout_ms'] is None:
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']

self.node_id = self.config.pop('node_id')

Expand Down Expand Up @@ -301,7 +308,10 @@ def __init__(self, host, port, afi, **configs):
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._sasl_auth_future = None
self.last_attempt = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with removing the last_attempt attribute is breaking functionality for users who may depend on this value, for whatever reason. Any particular reason why we can't retain this and allow last_activity to coexist as a separate attribute?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you see this @petterroea?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I agree, that's a valid concern.

I don't think renaming the variable back is a good idea, as it is now updated more often than before and cannot be used to indicate the same things.

I can reinstate last_activity and make it be updated in the same places as earlier. Only downside here is that it would be a variable that is never read internally in the library, making it excess, but it would avoid breaking compatibility. Are you okay with this solution?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost, let me make a small adjustment. There are some points where I'd like to update both values via self.last_activity = self.last_attempt = time.time().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for me!

Just remember that the value of keeping self.last_activity is diminishing if the variable isn't updated at the same places in time as before this PR. I don't know where you want to make the changes, so I can't really comment more on it, though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbarnha Happy to resolve this conversation?

self.last_activity = 0
# This value is not used for internal state, but it is left to allow backwards-compatability
# The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks.
self.last_attempt= 0
self._gai = []
self._sensors = None
if self.config['metrics']:
Expand Down Expand Up @@ -379,6 +389,7 @@ def connect(self):
self.config['state_change_callback'](self.node_id, self._sock, self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
self.last_activity = time.time()

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand Down Expand Up @@ -411,6 +422,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
Expand All @@ -436,6 +448,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
Expand All @@ -446,12 +459,13 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state not in (ConnectionStates.CONNECTED,
ConnectionStates.DISCONNECTED):
# Connection timed out
request_timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_attempt:
request_timeout = self.config['connection_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_activity:
log.error('Connection attempt to %s timed out', self)
self.close(Errors.KafkaConnectionError('timeout'))
return self.state
Expand Down Expand Up @@ -844,7 +858,7 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
if time.time() < self.last_attempt + self._reconnect_backoff:
if time.time() < self.last_activity + self._reconnect_backoff:
return True
return False

Expand All @@ -855,7 +869,7 @@ def connection_delay(self):
the reconnect backoff time. When connecting or connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
time_waited = time.time() - (self.last_activity or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ class KafkaProducer(object):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
receive_buffer_bytes (int): The size of the TCP receive buffer
Expand Down Expand Up @@ -304,6 +307,7 @@ class KafkaProducer(object):
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down
88 changes: 80 additions & 8 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import mock
import pytest
import time

from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
Expand Down Expand Up @@ -61,28 +62,99 @@ def test_connect_timeout(_socket, conn):
# Initial connect returns EINPROGRESS
# immediate inline connect returns EALREADY
# second explicit connect returns EALREADY
# third explicit connect returns EALREADY and times out via last_attempt
# third explicit connect returns EALREADY and times out via last_activity
_socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY]
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.last_activity = 0
conn.last_attempt = 0
conn.connect()
assert conn.state is ConnectionStates.DISCONNECTED

def test_connect_timeout_slowconn(_socket, conn, mocker):
# Same as test_connect_timeout,
# but we make the connection run longer than the timeout in order to test that
# BrokerConnection resets the timer whenever things happen during the connection
# See https://github.com/dpkp/kafka-python/issues/2386
_socket.connect_ex.side_effect = [EINPROGRESS, EISCONN]

# 0.8 = we guarantee that when testing with three intervals of this we are past the timeout
time_between_connect = (conn.config['connection_timeout_ms']/1000) * 0.8
start = time.time()

# Use plaintext auth for simplicity
last_activity = conn.last_activity
last_attempt = conn.last_attempt
conn.config['security_protocol'] = 'SASL_PLAINTEXT'
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
# Ensure the last_activity counter was updated
# Last_attempt should also be updated
assert conn.last_activity > last_activity
assert conn.last_attempt > last_attempt
last_attempt = conn.last_attempt
last_activity = conn.last_activity

# Simulate time being passed
# This shouldn't be enough time to time out the connection
conn._try_authenticate = mocker.Mock(side_effect=[False, False, True])
with mock.patch("time.time", return_value=start+time_between_connect):
# This should trigger authentication
# Note that an authentication attempt isn't actually made until now.
# We simulate that authentication does not succeed at this point
# This is technically incorrect, but it lets us see what happens
# to the state machine when the state doesn't change for two function calls
conn.connect()
assert conn.last_activity > last_activity
# Last attempt is kept as a legacy variable, should not update
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.AUTHENTICATING


# This time around we should be way past timeout.
# Now we care about connect() not terminating the attempt,
# because connection state was progressed in the meantime.
with mock.patch("time.time", return_value=start+time_between_connect*2):
# Simulate this one not succeeding as well. This is so we can ensure things don't time out
conn.connect()

# No state change = no activity change
assert conn.last_activity == last_activity
assert conn.last_attempt == last_attempt

# If last_activity was not reset when the state transitioned to AUTHENTICATING,
# the connection state would be timed out now.
assert conn.state is ConnectionStates.AUTHENTICATING


# This time around, the connection should succeed.
with mock.patch("time.time", return_value=start+time_between_connect*3):
# This should finalize the connection
conn.connect()

assert conn.last_activity > last_activity
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.CONNECTED



def test_blacked_out(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 0
conn.last_activity = 0
assert conn.blacked_out() is False
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.blacked_out() is True


def test_connection_delay(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
conn.state = ConnectionStates.CONNECTING
assert conn.connection_delay() == float('inf')
Expand Down Expand Up @@ -286,7 +358,7 @@ def test_lookup_on_connect():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
Expand All @@ -301,11 +373,10 @@ def test_relookup_on_failure():
assert conn.host == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
last_activity = conn.last_activity
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn.disconnected()
assert conn.last_attempt > last_attempt

afi2 = socket.AF_INET
sockaddr2 = ('127.0.0.2', 9092)
Expand All @@ -314,12 +385,13 @@ def test_relookup_on_failure():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()
assert conn.last_activity > last_activity


def test_requests_timed_out(conn):
Expand Down
Loading