Skip to content

Conversation

@kennyyu
Copy link
Contributor

@kennyyu kennyyu commented Nov 22, 2025

Previously, on async RL runs, we can hang in shutdown if we run out of data. This fixes it to ensure proper shutdown and that all data in queues are drained when the dataloader loop terminates:

Overview of the shutdown workflow:

  1. dataloader terminates, enqueues sentinel values in the trajectory worker queues
  2. trajectory worker sees sentinel values and terminates
  3. last trajectory worker enqueues a sentinel value to terminate the training loop
  4. training loop sees a sentinel value and terminates, and signals the evals loop
  5. evals loop is signaled and terminates

…data

Previously, on async RL runs, we can hang in shutdown if we run out of data.
Thi fixes it to ensure proper shutdown and that all data in queues
are drained with the dataloader loop terminates first.
@kennyyu kennyyu force-pushed the kenny/finish_async_hang_squash branch from 4ea9fd9 to 6172d61 Compare November 22, 2025 20:10
@joschu
Copy link
Collaborator

joschu commented Nov 23, 2025

@kennyyu I think there was a bug, which was caught by claude and codex reviews. I think I fixed it in my last push. Now trying to fix the auto review functionality.

@joschu joschu deployed to claude-review November 23, 2025 04:17 — with GitHub Actions Active
@claude
Copy link

claude bot commented Nov 23, 2025

Claude finished @kennyyu's task —— View job


Code Review Complete ✅

I've reviewed the async RL shutdown fixes in tinker_cookbook/rl/train.py. Overall, the implementation looks solid and properly addresses the hanging issue. Here's my detailed analysis:


Positive Changes

1. Clear Shutdown Workflow
The implementation follows a clean shutdown pattern:

  • Dataloader → Trajectory workers → Training loop → Evaluation loop
  • Each stage properly signals the next using sentinel values (Shutdown())
  • Good logging at each shutdown transition point

2. Proper Sentinel Propagation

  • Dataloader enqueues Shutdown() sentinels for all trajectory workers (lines 502-507)
  • Last trajectory worker enqueues Shutdown() to training loop (lines 546-552)
  • Training loop signals evaluation loop via event (line 662)
  • All sentinels are properly awaited with await env_group_builders_queue.put(Shutdown())

3. AsyncCounter Pattern
The AsyncCounter class (lines 438-446) is a clean solution for tracking when the last worker terminates. Thread-safe with proper locking.

4. Match Statement Usage
Good use of pattern matching to handle Shutdown | None | WrappedTrajectoryGroup cases throughout.


Issues Found

⚠️ CRITICAL: Missing trajectory_group_filter Application

Location: tinker_cookbook/rl/train.py:869-881

After adding the trajectory group to the list, the filter is never applied. This means stale trajectory groups will accumulate in wrapped_trajectory_groups and eventually get trained on.

# Current code (lines 869-881)
match wrapped_trajectory_group:
    case WrappedTrajectoryGroup():
        pass
    case Shutdown():
        logger.info("[do_train_step_streaming_and_get_sampling_client] Received shutdown signal")
        return None
    case None:
        continue
wrapped_trajectory_groups.append(wrapped_trajectory_group)  # ❌ No filter applied!

Fix needed:

match wrapped_trajectory_group:
    case WrappedTrajectoryGroup():
        pass
    case Shutdown():
        logger.info("[do_train_step_streaming_and_get_sampling_client] Received shutdown signal")
        return None
    case None:
        continue

# Apply the filter before appending
if not trajectory_group_filter(wrapped_trajectory_group):
    continue

wrapped_trajectory_groups.append(wrapped_trajectory_group)

This is likely the bug that @joschu mentioned was caught by previous reviews.


Minor Issues

1. Potential Race in Evaluation Loop (lines 672-689)

The evaluation loop checks evaluation_loop_should_shutdown_event.is_set() at the top of the while loop, but there's a window where:

  1. Training loop sets shutdown event (line 662)
  2. Evaluation loop is already past the check and waiting on sampling_client_updated_event.wait() (line 673)
  3. Training loop sets sampling_client_updated_event (line 663)
  4. Evaluation loop wakes up, clears the event (line 674), and continues

This isn't a hang risk because the next loop iteration will catch the shutdown event, but it could cause one extra unnecessary iteration.

Suggested fix: Check shutdown event after waiting:

while not evaluation_loop_should_shutdown_event.is_set():
    await sampling_client_updated_event.wait()
    sampling_client_updated_event.clear()
    
    # Check again after waking up
    if evaluation_loop_should_shutdown_event.is_set():
        break
    
    # ... rest of evaluation logic

2. Type Annotation Inconsistency (line 838)

The return type annotation doesn't account for the filter callback potentially being applied in the streaming path:

trajectory_group_filter: Callable[[WrappedTrajectoryGroup | None], bool] = lambda _: True,

When filter_stale_trajectory_group is passed (line 607), it can return False for stale groups, but this isn't reflected in how the function processes groups in the streaming path.


Questions

  1. Checkpoint state: The TODO comment at line 641 mentions saving dataloader state and queued trajectory groups for proper checkpointing. Is this planned for a future PR?

  2. Worker restart: If a trajectory worker crashes (unhandled exception), will the counter still decrement? Consider wrapping the worker loop in a try/finally to ensure the counter always decrements.

  3. Queue drainage: When Shutdown() is enqueued, are there guarantees that all pending WrappedTrajectoryGroup items ahead of it in the queue will be processed? The current code seems to assume FIFO ordering, which should be fine for asyncio.Queue, but worth confirming.


Summary

Must fix:

  • Apply trajectory_group_filter in do_train_step_streaming_and_get_sampling_client (lines 869-881)

Nice to have:

  • Add shutdown check after waiting in evaluation loop (lines 672-689)
  • Consider try/finally around worker loops to ensure counter always decrements

The overall shutdown logic is well-designed and should prevent hanging once the filter application issue is fixed.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants