Skip to content

Conversation

@willccbb
Copy link
Member

@willccbb willccbb commented Jan 17, 2026

Description

Enables multi-processing for env workers

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Test improvement

Testing

  • All existing tests pass when running uv run pytest locally.
  • New tests have been added to cover the changes

Checklist

  • My code follows the style guidelines of this project as outlined in AGENTS.md
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Additional Notes


Note

Introduces a new results schema and multiprocessing evaluation pipeline.

  • Define RolloutResult and change GenerateOutputs to rollouts + metadata; update docs, printing, dataset conversion, and save utilities to consume rollout-centric outputs
  • Refactor Environment.generate/evaluate to build outputs via type_utils (state_to_result, build_generate_outputs) and sort/persist consistently; remove legacy parallel-list fields
  • Add multiprocessing env workers (verifiers.workers.{client,server,types}), export EnvClient/EnvServer, and migrate run_evaluation to use workers; add --num-workers and wire through EvalConfig
  • Update RL trainer to read trajectories/timing from rollouts; adjust CLI, tests, and minor utilities (message/data utils, rubric, math_rubric import) accordingly

Written by Cursor Bugbot for commit 10ed830. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 5 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.


res = env.evaluate_sync(client=client, model="mock")
st = res["state"][0]
st = res["results"][0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test accesses removed reward key on GenerateOutputs

High Severity

The assertion res["reward"] == [1.0] accesses a key that no longer exists. After the refactor, GenerateOutputs only contains rollouts and metadata. Rewards are now accessed via individual rollout results like res["rollouts"][0].get("reward").

Fix in Cursor Fix in Web

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

completion_logprobs.append(tokens["completion_logprobs"])
advantages.append(step["advantage"])

# Build rewards_dict from rollout-level data (for logging only)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyError when accessing optional advantage field in trajectory

High Severity

The orchestrator accesses step["advantage"] directly, but RolloutResultTrajectoryStep has total=False making advantage an optional key. When state_to_result converts a trajectory step, it only includes advantage if it's not None. If scoring hasn't run or uses dummy_score_rollout (which doesn't set advantage), the key will be absent and this line will raise KeyError instead of returning None as the old code did.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Member

@mikasenghaas mikasenghaas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not super convinced that ipc is the right protocol to use here. because the env client "owns" the env workers. this should work for the current prime-rl/vf-eval usecases but im wondering if it might not be too restrictive for potential future usecases, ie. i could see it become desirable to really "host" an env server, that multiple clients can talk which would be hard to do with ipc.

also my gut feeling tells me that in order to make this maintainable we should aim to mirror the regular environment api as closely as possible (almost like a pass-through). the end goal would be that

env = vf.load_environment(env_id, **env_args)

can be 1-1 replaced with

env_client = EnvClient(env_id, **env_args)

im not sure this is fully realistic but imo would be the ideal state to have a contract/interface general enough to work in-proc and across arbitrary protocols.

### GenerateOutputs

```python
class GenerateOutputs(TypedDict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh i love this!! finally row order:))

"""

group_inputs: list[RolloutInput]
example_id: int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should prob name this request_id and construct internally (not part of public-facing api) because the same example_id might be in-flight multiple times (we acc had that bug in prime-rl)

Comment on lines +84 to +88
# Request dataset and metadata from first worker
first_worker.send_request(MetadataRequest(num_examples=num_examples))
response = first_worker.recv_response(timeout=120)
if response is None or not isinstance(response, MetadataResponse):
raise RuntimeError("Failed to get metadata from worker")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought the point of the dataset builder pattern was that we dont have to transport the dataset via ipc. would really like to try to avoid this op. i feel like any env worker may or may not build the dataset (by default, they don't build it but should be configurable from client)

if not future.done():
future.set_result(response.results)

async def run_groups(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why expose run_groups here instead of run_group, imo it would be desirable to mirror the regular Environment methods as closely as possible with the EnvClient so that an EnvClient can just be hotswapped in for an in-proc Environment

@willccbb
Copy link
Member Author

Gonna close this + work off your other PR, can revisit ideas from it as needed.

@willccbb willccbb closed this Jan 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants