@@ -847,53 +847,52 @@ def __init__(self, analyzer: ShardAnalyzer):
847847 self .seq_deltas : dict [str , int ]
848848 self .size_deltas : dict [str , float ]
849849
850- def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
851- return f"{ shard .node_id } -{ shard .shard_id } "
852-
853- def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
854- seq_result : dict [str , int ] = {}
855- size_result : dict [str , float ] = {}
856-
857- for shard in updated_shards :
858- shard_compound_id = self ._get_shard_compound_id (shard )
859-
860- if shard_compound_id not in reference_shards :
861- seq_result [shard_compound_id ] = 0
862- size_result [shard_compound_id ] = shard .size_gb
863- else :
864- refreshed_number = shard .seq_stats_max_seq_no
865- reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
866-
867- if refreshed_number < reference :
868- refreshed_number += 2 ** 63 - 1
850+ self .table_filter : str | None = None
851+ self .sort_by : str = 'heat'
869852
870- seq_result [shard_compound_id ] = refreshed_number - reference
871- size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
872-
873- self .seq_deltas = seq_result
874- self .size_deltas = size_result
875-
876- def refresh_data (self ):
877- self .analyzer ._refresh_data ()
878- updated_shards : list [ShardInfo ] = self .analyzer .shards
879- self .calculate_heat_deltas (self .reference_shards , updated_shards )
880- self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )], reverse = True )
853+ def monitor_shards (self , table_filter : str | None , interval_in_seconds : int = 5 , repeat : int = 10 , n_shards : int = 40 , sort_by : str = 'heat' ):
854+ self .table_filter = table_filter
855+ self .sort_by = sort_by
881856
882- def monitor_shards (self , interval_in_seconds : int = 10 , n_shards : int = 25 ):
883857 self .reference_shards = {self ._get_shard_compound_id (shard ): shard for shard in self .analyzer .shards }
884858 self .refresh_data ()
885859
886860 console .print (Panel .fit (f"[bold blue]The { n_shards } Hottest Shards[/bold blue]" ))
887- shards_table = self .display_shards_table_header ()
888861
889- with Live (self .generate_table (self .latest_shards [:n_shards ], self .seq_deltas ), refresh_per_second = 4 , console = console ) as live :
862+ go_live = False
863+ if go_live :
864+ with Live (self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas ), refresh_per_second = 4 , console = console ) as live_shards :
865+ while True :
866+ sleep (interval_in_seconds )
867+ self .refresh_data ()
868+ live_shards .update (self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas ))
869+ else :
870+ iterations = 0
890871 while True :
891872 sleep (interval_in_seconds )
892873 self .refresh_data ()
893- # self.display_shards_table_rows(shards_table, self.latest_shards, self.deltas)
894- live .update (self .generate_table (self .latest_shards [:n_shards ], self .seq_deltas ))
874+ shards_table = self .generate_shards_table (self ._get_top_shards (self .latest_shards , n_shards ), self .seq_deltas )
875+ console .print (shards_table )
876+ nodes_table = self .generate_nodes_table (self ._get_nodes_heat_info (self .reference_shards , self .seq_deltas ))
877+ console .print (nodes_table )
878+
879+ iterations += 1
880+ if 0 < repeat <= iterations :
881+ break
882+
883+ def generate_nodes_table (self , heat_nodes_info : dict [str , int ]):
884+ table = Table (title = "Shard heat by node" , box = box .ROUNDED )
885+ table .add_column ("Node name" , style = "cyan" )
886+ table .add_column ("Heat" , style = "magenta" )
895887
896- def generate_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
888+ sorted_items = sorted (heat_nodes_info .items (), key = lambda kv : (kv [1 ], kv [0 ]), reverse = True )
889+
890+ for k , v in sorted_items :
891+ table .add_row (k , str (v ))
892+
893+ return table
894+
895+ def generate_shards_table (self , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
897896 t = self .display_shards_table_header ()
898897 self .display_shards_table_rows (t , sorted_shards , deltas )
899898 return t
@@ -909,6 +908,8 @@ def display_shards_table_header(self):
909908 shards_table .add_column ("Size" , style = "magenta" )
910909 shards_table .add_column ("Size Delta" , style = "magenta" )
911910 shards_table .add_column ("Seq Delta" , style = "magenta" )
911+ shards_table .add_column ("DEBUG original Seq no." , style = "magenta" )
912+ shards_table .add_column ("DEBUG Seq no." , style = "magenta" )
912913 return shards_table
913914
914915 def display_shards_table_rows (self , shards_table : Table , sorted_shards : list [ShardInfo ], deltas : dict [str , int ]):
@@ -927,9 +928,68 @@ def display_shards_table_rows(self, shards_table: Table, sorted_shards: list[Sha
927928 format_size (shard .size_gb ),
928929 format_size (self .size_deltas [shard_compound_id ]),
929930 str (seq_delta ),
931+ str (self .reference_shards [shard_compound_id ].seq_stats_max_seq_no ),
932+ str (shard .seq_stats_max_seq_no )
930933 )
931934 console .print (shards_table )
932935
936+ def _get_shard_compound_id (self , shard : ShardInfo ) -> str :
937+ if self .sort_by == 'node' :
938+ return f"{ shard .node_name } -{ shard .table_name } -{ shard .shard_id } "
939+ else :
940+ return f"{ shard .table_name } -{ shard .shard_id } -{ shard .node_name } "
941+
942+ def calculate_heat_deltas (self , reference_shards : dict [str , ShardInfo ], updated_shards : list [ShardInfo ]):
943+ seq_result : dict [str , int ] = {}
944+ size_result : dict [str , float ] = {}
945+
946+ for shard in updated_shards :
947+ shard_compound_id = self ._get_shard_compound_id (shard )
948+
949+ if shard_compound_id not in reference_shards :
950+ seq_result [shard_compound_id ] = 0
951+ size_result [shard_compound_id ] = 0
952+ reference_shards [shard_compound_id ] = shard
953+ else :
954+ refreshed_number = shard .seq_stats_max_seq_no
955+ reference = reference_shards [shard_compound_id ].seq_stats_max_seq_no
956+
957+ if refreshed_number < reference :
958+ refreshed_number += 2 ** 63 - 1
959+
960+ seq_result [shard_compound_id ] = refreshed_number - reference
961+ size_result [shard_compound_id ] = shard .size_gb - reference_shards [shard_compound_id ].size_gb
962+
963+ self .seq_deltas = seq_result
964+ self .size_deltas = size_result
965+
966+ def refresh_data (self ):
967+ self .analyzer ._refresh_data ()
968+ updated_shards : list [ShardInfo ] = [s for s in self .analyzer .shards if not self .table_filter or self .table_filter == s .table_name ]
969+ self .calculate_heat_deltas (self .reference_shards , updated_shards )
970+ if self .sort_by == 'heat' :
971+ self .latest_shards = sorted (updated_shards , key = lambda s : self .seq_deltas [self ._get_shard_compound_id (s )],
972+ reverse = True )
973+ else :
974+ self .latest_shards = sorted (updated_shards , key = lambda s : self ._get_shard_compound_id (s ))
975+
976+
977+ def _get_top_shards (self , sorted_shards : list [ShardInfo ], n_shards : int ) -> list [ShardInfo ]:
978+ if n_shards < 1 :
979+ return sorted_shards [:n_shards ]
980+ else :
981+ return sorted_shards
982+
983+ def _get_nodes_heat_info (self , shards : dict [str , ShardInfo ], seq_deltas : dict [str , int ]) -> dict [str , int ]:
984+ nodes : dict [str , int ] = {}
985+ for k , v in seq_deltas .items ():
986+ node_name = shards .get (k ).node_name
987+ if node_name not in nodes :
988+ nodes [node_name ] = v
989+ else :
990+ nodes [node_name ] += v
991+ return nodes
992+
933993
934994class ShardReporter :
935995 def __init__ (self , analyzer : ShardAnalyzer ):
0 commit comments