Skip to content

Commit

Permalink
Do not update pool if already created
Browse files Browse the repository at this point in the history
We want to only update the pool if previous do not exist
or is shutdown. This commit adds additional validation to
add_or_renew_pool to make sure this condition is met.

Fixes: scylladb#317
  • Loading branch information
sylwiaszunejko committed Oct 1, 2024
1 parent c0c016a commit c7e740c
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ def on_up(self, host):
futures_results = []
callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
for session in tuple(self.sessions):
future = session.add_or_renew_pool(host, is_host_addition=False)
future = session.add_or_renew_pool(host, is_host_addition=False, require_previous_shutdown_or_none=False)
if future is not None:
have_future = True
future.add_done_callback(callback)
Expand Down Expand Up @@ -2119,7 +2119,7 @@ def future_completed(future):

have_future = False
for session in tuple(self.sessions):
future = session.add_or_renew_pool(host, is_host_addition=True)
future = session.add_or_renew_pool(host, is_host_addition=True, require_previous_shutdown_or_none=False)
if future is not None:
have_future = True
futures.add(future)
Expand Down Expand Up @@ -2650,7 +2650,7 @@ def __init__(self, cluster, hosts, keyspace=None):
# create connection pools in parallel
self._initial_connect_futures = set()
for host in hosts:
future = self.add_or_renew_pool(host, is_host_addition=False)
future = self.add_or_renew_pool(host, is_host_addition=False, require_previous_shutdown_or_none=False)
if future:
self._initial_connect_futures.add(future)

Expand Down Expand Up @@ -3273,7 +3273,7 @@ def __del__(self):
# when cluster.shutdown() is called explicitly.
pass

def add_or_renew_pool(self, host, is_host_addition):
def add_or_renew_pool(self, host, is_host_addition, require_previous_shutdown_or_none):
"""
For internal use only.
"""
Expand Down Expand Up @@ -3321,7 +3321,11 @@ def callback(pool, errors):
self._lock.acquire()
return False
self._lock.acquire()
self._pools[host] = new_pool
if require_previous_shutdown_or_none and (previous and not previous.is_shutdown):
new_pool.shutdown()
return True
else:
self._pools[host] = new_pool

log.debug("Added pool for host %s to session", host)
if previous:
Expand Down Expand Up @@ -3361,7 +3365,7 @@ def update_created_pools(self):
# to allow us to attempt connections to hosts that have gone from ignored to something
# else.
if distance != HostDistance.IGNORED and host.is_up in (True, None):
future = self.add_or_renew_pool(host, False)
future = self.add_or_renew_pool(host, False, require_previous_shutdown_or_none=True)
elif distance != pool.host_distance:
# the distance has changed
if distance == HostDistance.IGNORED:
Expand Down

0 comments on commit c7e740c

Please sign in to comment.