@@ -581,7 +581,7 @@ func (t *TopicApplier) updatePartitionsHelper(
581581 return errors .New ("Stopping because of user response" )
582582 }
583583
584- err = t .updatePartitionsIteration (ctx , currAssignments , desiredAssignments , true )
584+ err = t .updatePartitionsIteration (ctx , currAssignments , desiredAssignments , true , "" )
585585 if err != nil {
586586 return err
587587 }
@@ -861,24 +861,27 @@ func (t *TopicApplier) updatePlacementRunner(
861861 }
862862
863863 numRounds := (len (assignmentsToUpdate ) + batchSize - 1 ) / batchSize // Ceil() with integer math
864- roundScoreboard := color .New (color .FgYellow , color .Bold ).SprintfFunc ()
864+ highlighter := color .New (color .FgYellow , color .Bold ).SprintfFunc ()
865865 for i , round := 0 , 1 ; i < len (assignmentsToUpdate ); i , round = i + batchSize , round + 1 {
866866 end := i + batchSize
867867
868868 if end > len (assignmentsToUpdate ) {
869869 end = len (assignmentsToUpdate )
870870 }
871871
872+ var roundLabel string // "x of y" used to mark progress in balancing rounds
873+ roundLabel = highlighter ("%d of %d" , round , numRounds )
872874 log .Infof (
873875 "Balancing round %s" ,
874- roundScoreboard ( "%d of %d" , round , numRounds ) ,
876+ roundLabel ,
875877 )
876878
877879 err := t .updatePartitionsIteration (
878880 ctx ,
879881 currDiffAssignments [i :end ],
880882 assignmentsToUpdate [i :end ],
881883 newTopic ,
884+ roundLabel ,
882885 )
883886 if err != nil {
884887 return err
@@ -912,6 +915,7 @@ func (t *TopicApplier) updatePartitionsIteration(
912915 currAssignments []admin.PartitionAssignment ,
913916 assignmentsToUpdate []admin.PartitionAssignment ,
914917 newTopic bool ,
918+ roundLabel string ,
915919) error {
916920 idsToUpdate := []int {}
917921 for _ , assignment := range assignmentsToUpdate {
@@ -952,12 +956,14 @@ func (t *TopicApplier) updatePartitionsIteration(
952956 defer checkTimer .Stop ()
953957
954958 log .Info ("Sleeping then entering check loop" )
959+ highlighter := color .New (color .FgYellow , color .Bold ).SprintfFunc ()
960+ roundStartTime := time .Now ()
955961
956962outerLoop:
957963 for {
958964 select {
959965 case <- checkTimer .C :
960- log .Info ("Checking if all partitions in topic are properly replicated..." )
966+ log .Infof ("Checking if all partitions in topic %s are properly replicated..." , highlighter ( t . topicName ) )
961967
962968 topicInfo , err := t .adminClient .GetTopic (ctx , t .topicName , true )
963969 if err != nil {
@@ -981,7 +987,7 @@ outerLoop:
981987 partitionInfo := topicInfo .Partitions [assignment .ID ]
982988
983989 if ! util .SameElements (partitionInfo .Replicas , partitionInfo .ISR ) {
984- log .Infof ("Out of sync: %+v, %+v" , partitionInfo .Replicas , partitionInfo .ISR )
990+ log .Debugf ("Out of sync: %+v, %+v" , partitionInfo .Replicas , partitionInfo .ISR )
985991 notReady = append (notReady , partitionInfo )
986992 continue
987993 }
@@ -997,7 +1003,11 @@ outerLoop:
9971003 }
9981004
9991005 if len (notReady ) == 0 {
1000- log .Infof ("Partition(s) %+v looks good, continuing" , idsToUpdate )
1006+ elapsed := time .Now ().Sub (roundStartTime )
1007+ log .Infof ("Partition(s) %+v looks good, continuing (last round duration: %s)" ,
1008+ idsToUpdate ,
1009+ highlighter ("%.1fs" , float64 (elapsed )/ 1000000000 ), // time.Duration is int64 nanoseconds
1010+ )
10011011 break outerLoop
10021012 }
10031013 log .Infof (">>> Not ready: %+v" , notReady )
@@ -1008,7 +1018,14 @@ outerLoop:
10081018 len (assignmentsToUpdate ),
10091019 admin .FormatTopicPartitions (notReady , t .brokers ),
10101020 )
1011- log .Infof ("Sleeping for %s" , t .config .SleepLoopDuration .String ())
1021+
1022+ var roundString string // convert to " (round x of y)" if roundLabel is present
1023+ if roundLabel != "" {
1024+ roundString = fmt .Sprintf (" (current round %s, %+v elapsed)" , roundLabel , time .Now ().Sub (roundStartTime ))
1025+ } else {
1026+ roundString = roundLabel
1027+ }
1028+ log .Infof ("Sleeping for %s%s" , t .config .SleepLoopDuration .String (), roundString )
10121029 case <- ctx .Done ():
10131030 return ctx .Err ()
10141031 }
@@ -1050,7 +1067,7 @@ func (t *TopicApplier) applyThrottles(
10501067 var throttledTopic bool
10511068
10521069 if len (topicConfigEntries ) > 0 {
1053- log .Infof ("Applying topic throttles: %+v" , topicConfigEntries )
1070+ log .Infof ("Applying topic throttles (%d MB/sec) : %+v" , t . throttleBytes / 1000000 , topicConfigEntries )
10541071 _ , err := t .adminClient .UpdateTopicConfig (
10551072 ctx ,
10561073 t .topicName ,
@@ -1066,14 +1083,16 @@ func (t *TopicApplier) applyThrottles(
10661083 throttledBrokers := []int {}
10671084
10681085 for _ , brokerThrottle := range brokerThrottles {
1069- log .Infof ("Applying throttle to broker %d" , brokerThrottle .Broker )
1086+ log .Debugf ("Applying throttle to broker %d" , brokerThrottle .Broker )
10701087 updatedKeys , err := t .adminClient .UpdateBrokerConfig (
10711088 ctx ,
10721089 brokerThrottle .Broker ,
10731090 brokerThrottle .ConfigEntries (),
10741091 false ,
10751092 )
10761093 if err != nil {
1094+ log .Infof ("Applied throttles to brokers %+v" , throttledBrokers ) // report on successful ones
1095+ log .Errorf ("Error occurred applying throttle to broker %d" , brokerThrottle .Broker ) // log failed one here
10771096 return throttledTopic , throttledBrokers , err
10781097 }
10791098
@@ -1084,6 +1103,7 @@ func (t *TopicApplier) applyThrottles(
10841103 )
10851104 }
10861105 }
1106+ log .Infof ("Applied throttles to brokers %+v" , throttledBrokers )
10871107
10881108 return throttledTopic , throttledBrokers , nil
10891109}
@@ -1123,7 +1143,7 @@ func (t *TopicApplier) removeThottles(
11231143 }
11241144
11251145 for _ , throttledBroker := range throttledBrokers {
1126- log .Infof ("Removing throttle from broker %d" , throttledBroker )
1146+ log .Debugf ("Removing throttle from broker %d" , throttledBroker )
11271147 _ , brokerErr := t .adminClient .UpdateBrokerConfig (
11281148 ctx ,
11291149 throttledBroker ,
@@ -1148,6 +1168,7 @@ func (t *TopicApplier) removeThottles(
11481168 err = multierror .Append (err , brokerErr )
11491169 }
11501170 }
1171+ log .Infof ("Removed throttles from brokers %+v" , throttledBrokers )
11511172
11521173 return err
11531174}
0 commit comments