-
Notifications
You must be signed in to change notification settings - Fork 180
[tinker-cookbook] rl: avoid hanging in async runs when we run out of data #107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…data Previously, on async RL runs, we can hang in shutdown if we run out of data. Thi fixes it to ensure proper shutdown and that all data in queues are drained with the dataloader loop terminates first.
4ea9fd9 to
6172d61
Compare
|
@kennyyu I think there was a bug, which was caught by claude and codex reviews. I think I fixed it in my last push. Now trying to fix the auto review functionality. |
|
Claude finished @kennyyu's task —— View job Code Review Complete ✅I've reviewed the async RL shutdown fixes in Positive Changes1. Clear Shutdown Workflow
2. Proper Sentinel Propagation
3. AsyncCounter Pattern 4. Match Statement Usage Issues Found
Location: After adding the trajectory group to the list, the filter is never applied. This means stale trajectory groups will accumulate in # Current code (lines 869-881)
match wrapped_trajectory_group:
case WrappedTrajectoryGroup():
pass
case Shutdown():
logger.info("[do_train_step_streaming_and_get_sampling_client] Received shutdown signal")
return None
case None:
continue
wrapped_trajectory_groups.append(wrapped_trajectory_group) # ❌ No filter applied!Fix needed: match wrapped_trajectory_group:
case WrappedTrajectoryGroup():
pass
case Shutdown():
logger.info("[do_train_step_streaming_and_get_sampling_client] Received shutdown signal")
return None
case None:
continue
# Apply the filter before appending
if not trajectory_group_filter(wrapped_trajectory_group):
continue
wrapped_trajectory_groups.append(wrapped_trajectory_group)This is likely the bug that @joschu mentioned was caught by previous reviews. Minor Issues1. Potential Race in Evaluation Loop (lines 672-689) The evaluation loop checks
This isn't a hang risk because the next loop iteration will catch the shutdown event, but it could cause one extra unnecessary iteration. Suggested fix: Check shutdown event after waiting: while not evaluation_loop_should_shutdown_event.is_set():
await sampling_client_updated_event.wait()
sampling_client_updated_event.clear()
# Check again after waking up
if evaluation_loop_should_shutdown_event.is_set():
break
# ... rest of evaluation logic2. Type Annotation Inconsistency (line 838) The return type annotation doesn't account for the filter callback potentially being applied in the streaming path: trajectory_group_filter: Callable[[WrappedTrajectoryGroup | None], bool] = lambda _: True,When Questions
SummaryMust fix:
Nice to have:
The overall shutdown logic is well-designed and should prevent hanging once the filter application issue is fixed. |
Previously, on async RL runs, we can hang in shutdown if we run out of data. This fixes it to ensure proper shutdown and that all data in queues are drained when the dataloader loop terminates:
Overview of the shutdown workflow: