@@ -145,47 +145,82 @@ def check_fw_pipeline(self, ipaddr, port):
145145 if self .test :
146146 return 5
147147
148- s = None
149- try :
150- s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
151-
152- s .settimeout (3.0 )
153- s .connect ((ipaddr , port ))
154- self .dprint (f"Connection to { ipaddr } :{ port } successful." )
155-
156- s .settimeout (20.0 )
157- s .shutdown (socket .SHUT_WR )
158-
159- data = b""
160- while True :
161- chunk = s .recv (4096 )
162- if not chunk :
163- break
164- data += chunk
165-
166- text = data .decode ("utf-8" , errors = "ignore" )
167-
148+ retry_attempts = 3
149+ retry_delay = 2
150+ last_exception = None
151+
152+ for attempt in range (retry_attempts ):
153+ s = None
168154 try :
169- arr = json .loads (text )
170- if isinstance (arr , list ):
171- for obj in reversed (arr ):
172- if isinstance (obj , dict ):
173- for key , value in obj .items ():
174- if key .endswith (".worker_state" ):
175- return int (value )
176- except json .JSONDecodeError as e :
177- self .dprint (f"Failed to parse JSON for { ipaddr } :{ port } . Data might be incomplete. Error: { e } " )
155+ s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
156+
157+ s .settimeout (3.0 )
158+ s .connect ((ipaddr , port ))
159+
160+ s .settimeout (20.0 )
161+ s .shutdown (socket .SHUT_WR )
162+
163+ buffer = ""
164+ while True :
165+ try :
166+ chunk = s .recv (4096 ).decode ('utf-8' , errors = 'ignore' )
167+ if not chunk :
168+ break
169+ buffer += chunk
170+ except socket .timeout :
171+ break
172+
173+ buffer = buffer .strip ()
174+ if not buffer :
175+ raise ConnectionError ("No data received from socket" )
176+
177+ all_objects = []
178+ decoder = json .JSONDecoder ()
179+ pos = 0
180+ while pos < len (buffer ):
181+ try :
182+ obj , pos = decoder .raw_decode (buffer , pos )
183+ all_objects .append (obj )
184+ except json .JSONDecodeError :
185+ break
186+
187+ if not all_objects :
188+ raise ValueError ("Could not parse any JSON objects from received data" )
189+
190+ found_val = None
191+ def find_in_obj (target ):
192+ nonlocal found_val
193+ if isinstance (target , list ):
194+ for item in reversed (target ):
195+ find_in_obj (item )
196+ if found_val is not None : return
197+ elif isinstance (target , dict ):
198+ for key , value in target .items ():
199+ if key .endswith (".worker_state" ):
200+ found_val = int (value )
201+ return
202+
203+ find_in_obj (all_objects )
204+
205+ if found_val is not None :
206+ return found_val
207+
208+ print (f"worker_state not found for { ipaddr } :{ port } . No further retries for this cycle." )
178209 return - 1
179210
180- self .dprint (f"worker_state not found for { ipaddr } :{ port } " )
181- return - 1
211+ except (OSError , ConnectionError , ValueError ) as e :
212+ last_exception = e
213+ print (f"Attempt { attempt + 1 } /{ retry_attempts } for { ipaddr } :{ port } failed: { e } " )
214+ if attempt < retry_attempts - 1 :
215+ print (f"Retrying in { retry_delay } seconds..." )
216+ time .sleep (retry_delay )
182217
183- except OSError as e :
184- self . dprint ( f"Socket error for { ipaddr } : { port } : { e } " )
185- return - 1
186- finally :
187- if s :
188- s . close ()
218+ finally :
219+ if s :
220+ s . close ()
221+
222+ print ( f"All { retry_attempts } attempts failed for { ipaddr } : { port } . Last error: { last_exception } " )
223+ return - 1
189224
190225 def check_efu_pipeline (self , ipaddr , port ):
191226 if self .test :
@@ -227,7 +262,7 @@ def check_service(self, idx, type, ipaddr, port):
227262 self .lab .servers [idx ][9 ] = self .efu_get_version (ipaddr , port )
228263 elif type == type_fw :
229264 status = self .check_fw_pipeline (ipaddr , port )
230- if status == 0 :
265+ if status == - 1 or status == 0 :
231266 self .lab .clearstatus (
232267 idx , self .s_stage1 | self .s_stage2 | self .s_stage3
233268 )
0 commit comments