1
1
"""
2
2
Shard analysis and rebalancing logic for CrateDB
3
3
"""
4
- import datetime
4
+
5
+ import enum
5
6
import logging
6
7
import math
7
8
from collections import defaultdict
9
+ from datetime import datetime
8
10
from time import sleep
9
11
from typing import Any , Dict , List , Optional , Set , Tuple , Union
10
- from unittest import result
11
12
12
13
from rich import box
13
14
from rich .console import Console
14
15
from rich .panel import Panel
15
16
from rich .table import Table
16
- from rich .live import Live
17
17
18
18
from cratedb_toolkit .admin .xmover .model import (
19
19
DistributionStats ,
@@ -839,93 +839,112 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100.
839
839
}
840
840
841
841
842
- class ShardMonitor :
842
+ class ShardHeatSortByChoice (enum .Enum ):
843
+ heat = enum .auto ()
844
+ table = enum .auto ()
845
+ node = enum .auto ()
846
+
847
+
848
+ class ShardHeatReporter :
843
849
def __init__ (self , analyzer : ShardAnalyzer ):
844
850
self .analyzer = analyzer
845
- self .reference_shards : dict [str , ShardInfo ]
846
- self .latest_shards : list [ShardInfo ]
847
- self .seq_deltas : dict [str , int ]
848
- self .size_deltas : dict [str , float ]
851
+ self .reference_shards : dict [str , ShardInfo ] = {}
852
+ self .latest_shards : list [ShardInfo ] = []
853
+ self .seq_deltas : dict [str , int ] = {}
854
+ self .size_deltas : dict [str , float ] = {}
849
855
850
- self .table_filter : str | None = None
851
- self .sort_by : str = ' heat'
856
+ self .table_filter : str | None = None
857
+ self .sort_by : ShardHeatSortByChoice = ShardHeatSortByChoice . heat
852
858
853
- def monitor_shards (self , table_filter : str | None , interval_in_seconds : int = 5 , repeat : int = 10 , n_shards : int = 40 , sort_by : str = 'heat' ):
859
+ def report (
860
+ self ,
861
+ table_filter : str | None ,
862
+ interval_in_seconds : int ,
863
+ watch : bool ,
864
+ n_shards : int ,
865
+ sort_by : ShardHeatSortByChoice ,
866
+ ):
854
867
self .table_filter = table_filter
855
868
self .sort_by = sort_by
856
869
857
870
self .reference_shards = {self ._get_shard_compound_id (shard ): shard for shard in self .analyzer .shards }
871
+ start_time = datetime .now ()
858
872
self .refresh_data ()
859
873
860
- console .print (Panel .fit (f "[bold blue]The { n_shards } Hottest Shards [/bold blue]" ))
874
+ console .print (Panel .fit ("[bold blue]Shard heat analyzer [/bold blue]" ))
861
875
862
- iterations = 0
863
876
while True :
864
877
sleep (interval_in_seconds )
865
878
self .refresh_data ()
866
- shards_table = self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas )
879
+ shards_table = self .generate_shards_table (
880
+ self ._get_top_shards (self .latest_shards , n_shards ),
881
+ self .seq_deltas ,
882
+ (datetime .now () - start_time ).total_seconds (),
883
+ )
867
884
console .print (shards_table )
868
885
nodes_table = self .generate_nodes_table (self ._get_nodes_heat_info (self .reference_shards , self .seq_deltas ))
869
886
console .print (nodes_table )
870
887
871
- iterations += 1
872
- if 0 < repeat <= iterations :
888
+ if not watch :
873
889
break
874
890
875
- def generate_nodes_table (self , heat_nodes_info : dict [str , int ]):
891
+ @staticmethod
892
+ def generate_nodes_table (heat_nodes_info : dict [str , int ]):
893
+ console .print ()
876
894
table = Table (title = "Shard heat by node" , box = box .ROUNDED )
877
895
table .add_column ("Node name" , style = "cyan" )
878
896
table .add_column ("Heat" , style = "magenta" )
879
897
880
- sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
898
+ sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
881
899
882
900
for k , v in sorted_items :
883
901
table .add_row (k , str (v ))
884
902
885
903
return table
886
904
887
- def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
888
- t = self .display_shards_table_header ()
889
- self .display_shards_table_rows (t , sorted_shards , deltas )
905
+ def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ], elapsed_time_s : float ):
906
+ t = self ._display_shards_table_header ()
907
+ self ._display_shards_table_rows (t , sorted_shards , deltas , elapsed_time_s )
890
908
return t
891
909
892
- # Cluster summary table
893
- def display_shards_table_header (self ):
894
- shards_table = Table (title = "Hot shards" , box = box .ROUNDED )
910
+ def _display_shards_table_header (self ):
911
+ shards_table = Table (title = f"Shards sorted by { self .sort_by .name } " , box = box .ROUNDED )
895
912
shards_table .add_column ("Schema" , style = "cyan" )
896
913
shards_table .add_column ("Table" , style = "cyan" )
897
- shards_table .add_column ("ID" , style = "cyan" )
914
+ shards_table .add_column ("Partition" , style = "cyan" )
915
+ shards_table .add_column ("Shard ID" , style = "cyan" )
898
916
shards_table .add_column ("Node" , style = "cyan" )
899
917
shards_table .add_column ("Primary" , style = "cyan" )
900
918
shards_table .add_column ("Size" , style = "magenta" )
901
919
shards_table .add_column ("Size Delta" , style = "magenta" )
902
920
shards_table .add_column ("Seq Delta" , style = "magenta" )
921
+ shards_table .add_column ("ops/second" , style = "magenta" )
903
922
return shards_table
904
923
905
- def display_shards_table_rows ( self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ]):
906
- shards_table . rows . clear ()
907
-
924
+ def _display_shards_table_rows (
925
+ self , shards_table : Table , sorted_shards : list [ ShardInfo ], deltas : dict [ str , int ], elapsed_time_s : float
926
+ ):
908
927
for shard in sorted_shards :
909
928
shard_compound_id = self ._get_shard_compound_id (shard )
910
929
seq_delta = deltas .get (shard_compound_id , 0 )
911
- if seq_delta != 0 :
912
- shards_table . add_row (
913
- shard .schema_name ,
914
- shard .table_name ,
915
- str (shard .shard_id ),
916
- shard .node_name ,
917
- str (shard .is_primary ),
918
- format_size (shard .size_gb ),
919
- format_size (self . size_deltas [ shard_compound_id ] ),
920
- str (seq_delta )
921
- )
922
- console . print ( shards_table )
930
+ shards_table . add_row (
931
+ shard . schema_name ,
932
+ shard .table_name ,
933
+ shard .partition_id ,
934
+ str (shard .shard_id ),
935
+ shard .node_name ,
936
+ str (shard .is_primary ),
937
+ format_size (shard .size_gb ),
938
+ format_size (seq_delta ),
939
+ str (seq_delta ),
940
+ str ( seq_delta / elapsed_time_s ),
941
+ )
923
942
924
943
def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
925
- if self .sort_by == ' node' :
926
- return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } "
944
+ if self .sort_by == ShardHeatSortByChoice . node :
945
+ return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } - { shard . partition_id } "
927
946
else :
928
- return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } "
947
+ return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } - { shard . partition_id } "
929
948
930
949
def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
931
950
seq_result : dict [str , int ] = {}
@@ -943,7 +962,7 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
943
962
reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
944
963
945
964
if refreshed_number < reference :
946
- refreshed_number += 2 ** 63 - 1
965
+ refreshed_number += 2 ** 63 - 1
947
966
948
967
seq_result [shard_compound_id ] = refreshed_number - reference
949
968
size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
@@ -953,29 +972,33 @@ def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_
953
972
954
973
def refresh_data (self ):
955
974
self .analyzer ._refresh_data ()
956
- updated_shards : list [ShardInfo ] = [s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name ]
975
+ updated_shards : list [ShardInfo ] = [
976
+ s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name
977
+ ]
957
978
self .calculate_heat_deltas (self .reference_shards , updated_shards )
958
- if self .sort_by == 'heat' :
959
- self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )],
960
- reverse = True )
979
+ if self .sort_by == ShardHeatSortByChoice .heat :
980
+ self .latest_shards = sorted (
981
+ updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )], reverse = True
982
+ )
961
983
else :
962
984
self .latest_shards = sorted (updated_shards , key = lambda s : self ._get_shard_compound_id (s ))
963
985
964
-
965
986
def _get_top_shards (self , sorted_shards : list [ShardInfo ], n_shards : int ) -> list [ShardInfo ]:
966
- if n_shards < 1 :
987
+ if n_shards > 0 :
967
988
return sorted_shards [:n_shards ]
968
989
else :
969
990
return sorted_shards
970
991
971
992
def _get_nodes_heat_info (self , shards : dict [str , ShardInfo ], seq_deltas : dict [str , int ]) -> dict [str , int ]:
972
993
nodes : dict [str , int ] = {}
973
994
for k , v in seq_deltas .items ():
974
- node_name = shards .get (k ).node_name
975
- if node_name not in nodes :
976
- nodes [node_name ] = v
977
- else :
978
- nodes [node_name ] += v
995
+ shard = shards .get (k )
996
+ if shard :
997
+ node_name = shard .node_name
998
+ if node_name not in nodes :
999
+ nodes [node_name ] = v
1000
+ else :
1001
+ nodes [node_name ] += v
979
1002
return nodes
980
1003
981
1004
0 commit comments