Skip to content

Commit

Permalink
Merge pull request scylladb#361 from scylladb/dk/320-python-implement…
Browse files Browse the repository at this point in the history
…ation-of-decouple-schema-fetch-queries-from-server-side-timeouts

Decouple schema fetch queries timeouts from server side timeouts
  • Loading branch information
dkropachev authored Oct 7, 2024
2 parents c0c016a + 7a4ae44 commit 9e9ded4
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 99 deletions.
56 changes: 42 additions & 14 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import absolute_import

import atexit
import datetime
from binascii import hexlify
from collections import defaultdict
from collections.abc import Mapping
Expand Down Expand Up @@ -82,7 +83,7 @@
from cassandra.marshal import int64_pack
from cassandra.tablets import Tablet, Tablets
from cassandra.timestamps import MonotonicTimestampGenerator
from cassandra.util import _resolve_contact_points_to_string_map, Version
from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query

from cassandra.datastax.insights.reporter import MonitorReporter
from cassandra.datastax.insights.util import version_supports_insights
Expand Down Expand Up @@ -1033,6 +1034,12 @@ def default_retry_policy(self, policy):
or to disable the shardaware port (advanced shardaware)
"""

metadata_request_timeout = datetime.timedelta(seconds=2)
"""
Timeout for all queries used by driver it self.
Supported only by Scylla clusters.
"""

@property
def schema_metadata_enabled(self):
"""
Expand Down Expand Up @@ -1148,7 +1155,9 @@ def __init__(self,
client_id=None,
cloud=None,
scylla_cloud=None,
shard_aware_options=None):
shard_aware_options=None,
metadata_request_timeout=None,
):
"""
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
extablishing connection pools or refreshing metadata.
Expand Down Expand Up @@ -1240,6 +1249,8 @@ def __init__(self,
self.no_compact = no_compact

self.auth_provider = auth_provider
if metadata_request_timeout is not None:
self.metadata_request_timeout = metadata_request_timeout

if load_balancing_policy is not None:
if isinstance(load_balancing_policy, type):
Expand Down Expand Up @@ -3549,6 +3560,7 @@ class PeersQueryType(object):
_is_shutdown = False
_timeout = None
_protocol_version = None
_metadata_request_timeout = None

_schema_event_refresh_window = None
_topology_event_refresh_window = None
Expand Down Expand Up @@ -3648,7 +3660,7 @@ def _reconnect_internal(self):
(conn, _) = self._connect_host_in_lbp()
if conn is not None:
return conn

# Try to re-resolve hostnames as a fallback when all hosts are unreachable
self._cluster._resolve_hostnames()

Expand Down Expand Up @@ -3693,7 +3705,10 @@ def _try_connect(self, host):
# If sharding information is available, it's a ScyllaDB cluster, so do not use peers_v2 table.
if connection.features.sharding_info is not None:
self._uses_peers_v2 = False


# Cassandra does not support "USING TIMEOUT"
self._metadata_request_timeout = None if connection.features.sharding_info is None \
else datetime.timedelta(seconds=self._cluster.control_connection_timeout)
self._tablets_routing_v1 = connection.features.tablets_routing_v1

# use weak references in both directions
Expand All @@ -3710,8 +3725,10 @@ def _try_connect(self, host):

sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
(peers_success, peers_result), (local_success, local_result) = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout, fail_on_error=False)

Expand All @@ -3722,7 +3739,8 @@ def _try_connect(self, host):
# error with the peers v2 query, fallback to peers v1
self._uses_peers_v2 = False
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
peers_result = connection.wait_for_response(
peers_query, timeout=self._timeout)

Expand Down Expand Up @@ -3830,7 +3848,12 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
log.debug("Skipping schema refresh due to lack of schema agreement")
return False

self._cluster.metadata.refresh(connection, self._timeout, fetch_size=self._schema_meta_page_size, **kwargs)
self._cluster.metadata.refresh(
connection,
self._timeout,
fetch_size=self._schema_meta_page_size,
metadata_request_timeout=self._metadata_request_timeout,
**kwargs)

return True

Expand Down Expand Up @@ -3861,8 +3884,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
else:
log.debug("[control connection] Refreshing node list and token map")
sel_local = self._SELECT_LOCAL
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
local_query = QueryMessage(query=sel_local, consistency_level=cl)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(sel_peers, self._metadata_request_timeout),
consistency_level=cl)
local_query = QueryMessage(query=maybe_add_timeout_to_query(sel_local, self._metadata_request_timeout),
consistency_level=cl)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)

Expand Down Expand Up @@ -3917,8 +3942,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
# local rpc_address has not been queried yet, try to fetch it
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
# in system.local. See CASSANDRA-9436.
local_rpc_address_query = QueryMessage(query=self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS,
consistency_level=ConsistencyLevel.ONE)
local_rpc_address_query = QueryMessage(
query=maybe_add_timeout_to_query(self._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS, self._metadata_request_timeout),
consistency_level=ConsistencyLevel.ONE)
success, local_rpc_address_result = connection.wait_for_response(
local_rpc_address_query, timeout=self._timeout, fail_on_error=False)
if success:
Expand Down Expand Up @@ -4153,8 +4179,10 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)

while elapsed < total_timeout:
peers_query = QueryMessage(query=select_peers_query, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
consistency_level=cl)
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
consistency_level=cl)
try:
timeout = min(self._timeout, total_timeout - elapsed)
peers_result, local_result = connection.wait_for_responses(
Expand Down
Loading

0 comments on commit 9e9ded4

Please sign in to comment.