Skip to content

Conversation

@pramodith
Copy link
Collaborator

@pramodith pramodith commented Nov 24, 2025

What does this PR do?

  1. Checks if a reward function is not a nn.Module
  2. If not checks if the reward function is synchronous or async.
  3. Execution of synchronous reward functions are parallelized by running each on the next available thread.
  4. Execution on non-async reward functions are parallelized by asyncio io's current event loop.
  5. We await until all the created tasks are complete.
  6. To avoid conflicts with any existing event loop that the code is running under we check if we're in an active event-loop if so we create a new event loop and run all the reward functions under the new event loop.

Fixes #4130

Before submitting

  • This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case).
  • Did you read the contributor guideline,
    Pull Request section?
  • Was this discussed/approved via a GitHub issue? Please add a link
    to it if that's the case.
  • Did you make sure to update the documentation with your changes?
  • Did you write any new necessary tests?

Who can review?

Anyone in the community is free to review the PR once the tests have passed. Feel free to tag
members/contributors who may be interested in your PR.

@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.

@pramodith
Copy link
Collaborator Author

On revision I think it might be more prudent to use ThreadPoolExecutor for synchronous reward functions since these are going to be CPU bound/compute heavy, running them in an async fashion might not lead to true parallelism. Happy to hear your thoughts on this.

@qgallouedec
Copy link
Member

On revision I think it might be more prudent to use ThreadPoolExecutor for synchronous reward functions since these are going to be CPU bound/compute heavy, running them in an async fashion might not lead to true parallelism. Happy to hear your thoughts on this.

I think I agree with this. I don't have a big background with asyncio though

@pramodith
Copy link
Collaborator Author

Done! Runs each synchronous reward function in its own thread now.

@qgallouedec
Copy link
Member

I think the implementation looks good.

However, I'm still wondering if we should constraint the reward function to be synchronous. If execution is parallelized, we should achieve the same performance, no?

@pramodith
Copy link
Collaborator Author

pramodith commented Nov 28, 2025

I think the implementation looks good.

However, I'm still wondering if we should constraint the reward function to be synchronous. If execution is parallelized, we should achieve the same performance, no?

There aren't any constraints the reward function can sync or async. Sync functions use ThreadPoolExecutor, async functions use async.gather.

Perhaps I'm misunderstanding you?

if asyncio.iscoroutinefunction(reward_func):
          async_funcs_info.append((i, reward_func, reward_func_name))
else:
    sync_funcs_info.append((i, reward_func, reward_func_name))

The test case added also tests for this by including one sync and one async function.

@qgallouedec
Copy link
Member

Sorry for the delay but it took me some time to find a clear way to transcript my mind:

Currently we have something like this:

import time


def reward_length(texts):
    time.sleep(0.5)
    return [float(len(t)) for t in texts]


def reward_vowel_count(texts):
    time.sleep(0.5)
    vowels = set("aeiouAEIOU")
    return [float(sum(ch in vowels for ch in t)) for t in texts]

if __name__ == "__main__":
    texts = [
        "The quick brown fox",
        "ChatGPT is helpful",
        "Async rewards can reduce latency",
    ]

    rewards_per_func = []
    for reward_fn in [reward_length, reward_vowel_count]:
        rewards_per_func.append(reward_fn(texts))
    print(rewards_per_func)

But it's not ideal, because each reward calculation works in sequence. So we could have something like this instead:

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
from typing import Callable


def reward_length(texts):
    time.sleep(0.5)
    return [float(len(t)) for t in texts]


def reward_vowel_count(texts):
    time.sleep(0.5)
    vowels = set("aeiouAEIOU")
    return [float(sum(ch in vowels for ch in t)) for t in texts]


def _worker_reward(fn, texts):
    return fn(texts)

if __name__ == "__main__":
    texts = [
        "The quick brown fox",
        "ChatGPT is helpful",
        "Async rewards can reduce latency",
    ]

    funcs = [reward_length, reward_vowel_count]
    with ProcessPoolExecutor(max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
        futures = [executor.submit(_worker_reward, fn, texts) for fn in funcs]
        rewards_per_func = [f.result() for f in futures]
    print(rewards_per_func)

which is nice in my opinion. The thing is, it doesn't natively supports asynchronous reward function.

to support async functions we should have something like this:

import asyncio


async def reward_length_async(texts):
    await asyncio.sleep(0.5)
    return [float(len(t)) for t in texts]


async def reward_vowel_count_async(texts):
    await asyncio.sleep(0.5)
    vowels = set("aeiouAEIOU")
    return [float(sum(ch in vowels for ch in t)) for t in texts]


if __name__ == "__main__":
    texts = [
        "The quick brown fox",
        "ChatGPT is helpful",
        "Async rewards can reduce latency",
    ]

    async def asyncio_concurrent(fns, texts: list[str]) -> list[list[float]]:
        tasks = [asyncio.create_task(fn(texts)) for fn in fns]
        return await asyncio.gather(*tasks)

    funcs = [reward_length_async, reward_vowel_count_async]
    rewards_per_func = asyncio.run(asyncio_concurrent(funcs, texts))
    print(rewards_per_func)

The issue is that, is introduces a good amount of complexity, because now we have to

  • use multiprocessing for synchronous reward functions
  • use ayncio for asynchronous reward functions

So I'm wondering, to reduce code complexity inside the GRPOTrainer, why not imposing the reward function to be synchronous, and it would be up to the use to call asyncio.run inside some wrapper, like here:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor


async def reward_length_async(texts):
    await asyncio.sleep(0.5)
    return [float(len(t)) for t in texts]


async def reward_vowel_count_async(texts):
    await asyncio.sleep(0.5)
    vowels = set("aeiouAEIOU")
    return [float(sum(ch in vowels for ch in t)) for t in texts]


def run_async_in_process(async_fn, texts): # this
    return asyncio.run(async_fn(texts))


if __name__ == "__main__":
    texts = [
        "The quick brown fox",
        "ChatGPT is helpful",
        "Async rewards can reduce latency",
    ]

    funcs = [reward_length_async, reward_vowel_count_async]

    with ProcessPoolExecutor( max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
        futures = [executor.submit(run_async_in_process, fn, texts) for fn in funcs]
        rewards_per_func = [f.result() for f in futures]
    print(rewards_per_func)

I know it's not ideal, because async functions are meant for concurrency within a single process. It would probably make it slightly slower (because of IPC overhead, process creation, event loop creation) but it would more simple on the GRPO side

@qgallouedec
Copy link
Member

Another option for the user would be to use a single reward function, like this:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor


async def reward_length_async(texts):
    await asyncio.sleep(0.5)
    return [float(len(t)) for t in texts]


async def reward_vowel_count_async(texts):
    await asyncio.sleep(0.5)
    vowels = set("aeiouAEIOU")
    return [float(sum(ch in vowels for ch in t)) for t in texts]


async def compute_rewards_async(texts):
    funcs = [reward_length_async, reward_vowel_count_async]
    tasks = [asyncio.create_task(fn(texts)) for fn in funcs]
    return await asyncio.gather(*tasks)


def compute_rewards(texts):
    rewards = asyncio.run(compute_rewards_async(texts))
    combined_rewards = [sum(reward_tuple) for reward_tuple in zip(*rewards)]
    return combined_rewards


def _worker_reward(fn, texts):
    return fn(texts)


if __name__ == "__main__":
    texts = [
        "The quick brown fox",
        "ChatGPT is helpful",
        "Async rewards can reduce latency",
    ]

    funcs = [compute_rewards]
    with ProcessPoolExecutor(max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
        futures = [executor.submit(_worker_reward, fn, texts) for fn in funcs]
        rewards_per_func = [f.result() for f in futures]
    print(rewards_per_func)

it will achieve the same speed as asyncio directly, the only drawback is that in the logs, you will get only the mean/std only for this global reward, and not per-reward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async Support for Reward Functions

3 participants