Skip to content

Commit c82f2a3

Browse files
committed
Add cluster/statistics endpoint handling.
This commit adds a new cluster method get_cluster_statistics() which retrieves valuable statistics from the cluster.
1 parent 7f36591 commit c82f2a3

File tree

5 files changed

+264
-7
lines changed

5 files changed

+264
-7
lines changed

.pre-commit-config.yaml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@ repos:
1313
- id: trailing-whitespace
1414

1515
- repo: https://github.com/myint/autoflake
16-
rev: v1.4
16+
rev: v2.2.1 # autoflake v2.2.1 is the latest version that supports Python 3.12
1717
hooks:
1818
- id: autoflake
1919
args: [--in-place, --remove-all-unused-imports, --exclude=weaviate/proto/*]
2020

21-
2221
- repo: https://github.com/PyCQA/flake8
2322
rev: 7.1.0
2423
hooks:
@@ -36,13 +35,13 @@ repos:
3635
]
3736
files: '^weaviate/collections'
3837

39-
- repo: local
40-
hooks:
38+
- repo: local
39+
hooks:
4140
- id: mypy
4241
name: mypy
4342
entry: ./run-mypy.sh
4443
language: python
45-
language_version: "3.11"
44+
language_version: "3.12"
4645
# use require_serial so that script
4746
# is only called once per commit
4847
require_serial: true

integration_v3/test_cluster.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,70 @@ def test_get_nodes_status_with_data(client: weaviate.Client):
104104
assert shards[0]["class"] == class_name1
105105
assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
106106
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
107+
108+
109+
def test_get_cluster_statistics(client: weaviate.Client):
110+
111+
if not client._connection._weaviate_version.is_lower_than(1, 25, 0):
112+
pytest.skip("Cluster statistics are supported in versions higher than 1.25.0")
113+
114+
"""Test getting cluster statistics."""
115+
stats = client.cluster.get_cluster_statistics()
116+
117+
# Check top level structure
118+
assert "statistics" in stats
119+
assert "synchronized" in stats
120+
assert isinstance(stats["synchronized"], bool)
121+
122+
# Check statistics array
123+
assert isinstance(stats["statistics"], list)
124+
assert len(stats["statistics"]) >= 1 # At least one node
125+
126+
# Check first node's statistics
127+
node = stats["statistics"][0]
128+
# bootstrapped is optional
129+
if "bootstrapped" in node:
130+
assert isinstance(node["bootstrapped"], bool)
131+
assert isinstance(node["candidates"], dict)
132+
# Check candidates structure if not empty
133+
if node["candidates"]:
134+
for node_name, address in node["candidates"].items():
135+
assert isinstance(node_name, str)
136+
assert isinstance(address, str)
137+
assert ":" in address # Address should be in format IP:PORT
138+
assert isinstance(node["dbLoaded"], bool)
139+
assert isinstance(node["isVoter"], bool)
140+
assert isinstance(node["leaderAddress"], str)
141+
assert isinstance(node["leaderId"], str)
142+
assert isinstance(node["name"], str)
143+
assert isinstance(node["open"], bool) # API returns 'open', not 'open_'
144+
assert isinstance(node["ready"], bool)
145+
assert isinstance(node["status"], str)
146+
147+
# Check Raft statistics
148+
raft = node["raft"]
149+
assert isinstance(raft["appliedIndex"], str)
150+
assert isinstance(raft["commitIndex"], str)
151+
assert isinstance(raft["fsmPending"], str)
152+
assert isinstance(raft["lastContact"], str)
153+
assert isinstance(raft["lastLogIndex"], str)
154+
assert isinstance(raft["lastLogTerm"], str)
155+
assert isinstance(raft["lastSnapshotIndex"], str)
156+
assert isinstance(raft["lastSnapshotTerm"], str)
157+
assert isinstance(raft["latestConfiguration"], list)
158+
assert isinstance(raft["latestConfigurationIndex"], str)
159+
assert isinstance(raft["numPeers"], str)
160+
assert isinstance(raft["protocolVersion"], str)
161+
assert isinstance(raft["protocolVersionMax"], str)
162+
assert isinstance(raft["protocolVersionMin"], str)
163+
assert isinstance(raft["snapshotVersionMax"], str)
164+
assert isinstance(raft["snapshotVersionMin"], str)
165+
assert isinstance(raft["state"], str)
166+
assert isinstance(raft["term"], str)
167+
168+
# Check at least one peer in the configuration
169+
assert len(raft["latestConfiguration"]) >= 1
170+
peer = raft["latestConfiguration"][0]
171+
assert isinstance(peer["address"], str)
172+
assert isinstance(peer["id"], str) # API returns 'id', not 'id_'
173+
assert isinstance(peer["suffrage"], int)

test/cluster/test_cluster.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,117 @@ def test_get_nodes_status(self):
5151
result = Cluster(mock_conn).get_nodes_status()
5252
self.assertListEqual(result, expected_resp.get("nodes"))
5353
mock_conn.get.assert_called_with(path="/nodes")
54+
55+
def test_get_cluster_statistics(self):
56+
# error messages
57+
unexpected_err_msg = "Cluster statistics"
58+
empty_response_err_msg = "Cluster statistics response returned empty"
59+
connection_err_msg = "Get cluster statistics failed due to connection error"
60+
61+
# expected failure
62+
mock_conn = mock_connection_func("get", status_code=500)
63+
with self.assertRaises(UnexpectedStatusCodeException) as error:
64+
Cluster(mock_conn).get_cluster_statistics()
65+
check_startswith_error_message(self, error, unexpected_err_msg)
66+
67+
mock_conn = mock_connection_func("get", status_code=200, return_json=None)
68+
with self.assertRaises(EmptyResponseException) as error:
69+
Cluster(mock_conn).get_cluster_statistics()
70+
check_error_message(self, error, empty_response_err_msg)
71+
72+
mock_conn = mock_connection_func("get", side_effect=RequestsConnectionError)
73+
with self.assertRaises(RequestsConnectionError) as error:
74+
Cluster(mock_conn).get_cluster_statistics()
75+
check_error_message(self, error, connection_err_msg)
76+
77+
# expected success
78+
expected_resp = {
79+
"statistics": [
80+
{
81+
"candidates": {
82+
"weaviate-0": "10.244.2.3:8300",
83+
"weaviate-1": "10.244.1.3:8300",
84+
},
85+
"dbLoaded": True,
86+
"isVoter": True,
87+
"leaderAddress": "10.244.3.3:8300",
88+
"leaderId": "weaviate-2",
89+
"name": "weaviate-0",
90+
"open_": True,
91+
"raft": {
92+
"appliedIndex": "3",
93+
"commitIndex": "3",
94+
"fsmPending": "0",
95+
"lastContact": "29.130625ms",
96+
"lastLogIndex": "3",
97+
"lastLogTerm": "2",
98+
"lastSnapshotIndex": "0",
99+
"lastSnapshotTerm": "0",
100+
"latestConfiguration": [
101+
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
102+
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
103+
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
104+
],
105+
"latestConfigurationIndex": "0",
106+
"numPeers": "2",
107+
"protocolVersion": "3",
108+
"protocolVersionMax": "3",
109+
"protocolVersionMin": "0",
110+
"snapshotVersionMax": "1",
111+
"snapshotVersionMin": "0",
112+
"state": "Follower",
113+
"term": "2",
114+
},
115+
"ready": True,
116+
"status": "HEALTHY",
117+
},
118+
{
119+
"bootstrapped": True,
120+
"candidates": {},
121+
"dbLoaded": True,
122+
"isVoter": True,
123+
"leaderAddress": "10.244.3.3:8300",
124+
"leaderId": "weaviate-2",
125+
"name": "weaviate-1",
126+
"open_": True,
127+
"raft": {
128+
"appliedIndex": "3",
129+
"commitIndex": "3",
130+
"fsmPending": "0",
131+
"lastContact": "41.289833ms",
132+
"lastLogIndex": "3",
133+
"lastLogTerm": "2",
134+
"lastSnapshotIndex": "0",
135+
"lastSnapshotTerm": "0",
136+
"latestConfiguration": [
137+
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
138+
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
139+
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
140+
],
141+
"latestConfigurationIndex": "0",
142+
"numPeers": "2",
143+
"protocolVersion": "3",
144+
"protocolVersionMax": "3",
145+
"protocolVersionMin": "0",
146+
"snapshotVersionMax": "1",
147+
"snapshotVersionMin": "0",
148+
"state": "Follower",
149+
"term": "2",
150+
},
151+
"ready": True,
152+
"status": "HEALTHY",
153+
},
154+
],
155+
"synchronized": True,
156+
}
157+
mock_conn = mock_connection_func("get", status_code=200, return_json=expected_resp)
158+
result = Cluster(mock_conn).get_cluster_statistics()
159+
160+
# Convert the response to match our type definitions with renamed fields
161+
for node in result["statistics"]:
162+
node["open_"] = node.pop("open_")
163+
for peer in node["raft"]["latestConfiguration"]:
164+
peer["id_"] = peer.pop("id_")
165+
166+
self.assertEqual(result, expected_resp)
167+
mock_conn.get.assert_called_with(path="/cluster/statistics")

weaviate/cluster/cluster.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from requests.exceptions import ConnectionError as RequestsConnectionError
88

9-
from weaviate.cluster.types import Node
9+
from weaviate.cluster.types import Node, ClusterStats
1010
from weaviate.connect import Connection
1111
from weaviate.exceptions import (
1212
EmptyResponseException,
@@ -79,3 +79,33 @@ def get_nodes_status(
7979
if nodes is None or nodes == []:
8080
raise EmptyResponseException("Nodes status response returned empty")
8181
return cast(List[Node], nodes)
82+
83+
def get_cluster_statistics(self) -> ClusterStats:
84+
"""
85+
Get the cluster statistics including Raft consensus information.
86+
87+
Returns
88+
-------
89+
ClusterStats
90+
Statistics about the cluster including Raft consensus information.
91+
92+
Raises
93+
------
94+
requests.ConnectionError
95+
If the network connection to weaviate fails.
96+
weaviate.UnexpectedStatusCodeException
97+
If weaviate reports a none OK status.
98+
weaviate.EmptyResponseException
99+
If the response is empty.
100+
"""
101+
try:
102+
response = self._connection.get(path="/cluster/statistics")
103+
except RequestsConnectionError as conn_err:
104+
raise RequestsConnectionError(
105+
"Get cluster statistics failed due to connection error"
106+
) from conn_err
107+
108+
response_typed = _decode_json_response_dict(response, "Cluster statistics")
109+
if response_typed is None:
110+
raise EmptyResponseException("Cluster statistics response returned empty")
111+
return cast(ClusterStats, response_typed)

weaviate/cluster/types.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Literal, Optional, TypedDict
1+
from typing import List, Literal, Optional, TypedDict, Dict
22

33

44
class BatchStats(TypedDict):
@@ -34,3 +34,50 @@ class Node(TypedDict):
3434
stats: Stats
3535
status: str
3636
version: str
37+
38+
39+
class RaftPeer(TypedDict):
40+
address: str
41+
id_: str
42+
suffrage: int
43+
44+
45+
class RaftStats(TypedDict):
46+
appliedIndex: str
47+
commitIndex: str
48+
fsmPending: str
49+
lastContact: str
50+
lastLogIndex: str
51+
lastLogTerm: str
52+
lastSnapshotIndex: str
53+
lastSnapshotTerm: str
54+
latestConfiguration: List[RaftPeer]
55+
latestConfigurationIndex: str
56+
numPeers: str
57+
protocolVersion: str
58+
protocolVersionMax: str
59+
protocolVersionMin: str
60+
snapshotVersionMax: str
61+
snapshotVersionMin: str
62+
state: str
63+
term: str
64+
65+
66+
# total=False is used to make handle some of the optional fields
67+
class ClusterNodeStats(TypedDict, total=False):
68+
bootstrapped: bool
69+
candidates: Dict[str, str]
70+
dbLoaded: bool
71+
isVoter: bool
72+
leaderAddress: str
73+
leaderId: str
74+
name: str
75+
open_: bool
76+
raft: RaftStats
77+
ready: bool
78+
status: str
79+
80+
81+
class ClusterStats(TypedDict):
82+
statistics: List[ClusterNodeStats]
83+
synchronized: bool

0 commit comments

Comments
 (0)