@@ -233,15 +233,15 @@ abstract protected static function perform(ClientState $multi, array &$responses
233233 */
234234 abstract protected static function select (ClientState $ multi , float $ timeout ): int ;
235235
236- private static function initialize (self $ response ): void
236+ private static function initialize (self $ response, float $ timeout = null ): void
237237 {
238238 if (null !== $ response ->info ['error ' ]) {
239239 throw new TransportException ($ response ->info ['error ' ]);
240240 }
241241
242242 try {
243- if (($ response ->initializer )($ response )) {
244- foreach (self ::stream ([$ response ]) as $ chunk ) {
243+ if (($ response ->initializer )($ response, $ timeout )) {
244+ foreach (self ::stream ([$ response ], $ timeout ) as $ chunk ) {
245245 if ($ chunk ->isFirst ()) {
246246 break ;
247247 }
@@ -304,7 +304,7 @@ private function doDestruct()
304304 $ this ->shouldBuffer = true ;
305305
306306 if ($ this ->initializer && null === $ this ->info ['error ' ]) {
307- self ::initialize ($ this );
307+ self ::initialize ($ this , - 0.0 );
308308 $ this ->checkStatusCode ();
309309 }
310310 }
@@ -325,6 +325,12 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
325325 $ lastActivity = microtime (true );
326326 $ elapsedTimeout = 0 ;
327327
328+ if ($ fromLastTimeout = 0.0 === $ timeout && '-0 ' === (string ) $ timeout ) {
329+ $ timeout = null ;
330+ } elseif ($ fromLastTimeout = 0 > $ timeout ) {
331+ $ timeout = -$ timeout ;
332+ }
333+
328334 while (true ) {
329335 $ hasActivity = false ;
330336 $ timeoutMax = 0 ;
@@ -340,13 +346,18 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
340346 $ timeoutMin = min ($ timeoutMin , $ response ->timeout , 1 );
341347 $ chunk = false ;
342348
349+ if ($ fromLastTimeout && null !== $ multi ->lastTimeout ) {
350+ $ elapsedTimeout = microtime (true ) - $ multi ->lastTimeout ;
351+ }
352+
343353 if (isset ($ multi ->handlesActivity [$ j ])) {
344- // no-op
354+ $ multi -> lastTimeout = null ;
345355 } elseif (!isset ($ multi ->openHandles [$ j ])) {
346356 unset($ responses [$ j ]);
347357 continue ;
348358 } elseif ($ elapsedTimeout >= $ timeoutMax ) {
349359 $ multi ->handlesActivity [$ j ] = [new ErrorChunk ($ response ->offset , sprintf ('Idle timeout reached for "%s". ' , $ response ->getInfo ('url ' )))];
360+ $ multi ->lastTimeout ?? $ multi ->lastTimeout = $ lastActivity ;
350361 } else {
351362 continue ;
352363 }
0 commit comments