-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Support async reward functions and parallelize call to reward functions. #4567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support async reward functions and parallelize call to reward functions. #4567
Conversation
|
The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update. |
|
On revision I think it might be more prudent to use |
I think I agree with this. I don't have a big background with asyncio though |
|
Done! Runs each synchronous reward function in its own thread now. |
|
I think the implementation looks good. However, I'm still wondering if we should constraint the reward function to be synchronous. If execution is parallelized, we should achieve the same performance, no? |
There aren't any constraints the reward function can sync or async. Sync functions use ThreadPoolExecutor, async functions use async.gather. Perhaps I'm misunderstanding you? The test case added also tests for this by including one sync and one async function. |
|
Sorry for the delay but it took me some time to find a clear way to transcript my mind: Currently we have something like this: import time
def reward_length(texts):
time.sleep(0.5)
return [float(len(t)) for t in texts]
def reward_vowel_count(texts):
time.sleep(0.5)
vowels = set("aeiouAEIOU")
return [float(sum(ch in vowels for ch in t)) for t in texts]
if __name__ == "__main__":
texts = [
"The quick brown fox",
"ChatGPT is helpful",
"Async rewards can reduce latency",
]
rewards_per_func = []
for reward_fn in [reward_length, reward_vowel_count]:
rewards_per_func.append(reward_fn(texts))
print(rewards_per_func)But it's not ideal, because each reward calculation works in sequence. So we could have something like this instead: import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
from typing import Callable
def reward_length(texts):
time.sleep(0.5)
return [float(len(t)) for t in texts]
def reward_vowel_count(texts):
time.sleep(0.5)
vowels = set("aeiouAEIOU")
return [float(sum(ch in vowels for ch in t)) for t in texts]
def _worker_reward(fn, texts):
return fn(texts)
if __name__ == "__main__":
texts = [
"The quick brown fox",
"ChatGPT is helpful",
"Async rewards can reduce latency",
]
funcs = [reward_length, reward_vowel_count]
with ProcessPoolExecutor(max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
futures = [executor.submit(_worker_reward, fn, texts) for fn in funcs]
rewards_per_func = [f.result() for f in futures]
print(rewards_per_func)which is nice in my opinion. The thing is, it doesn't natively supports asynchronous reward function. to support async functions we should have something like this: import asyncio
async def reward_length_async(texts):
await asyncio.sleep(0.5)
return [float(len(t)) for t in texts]
async def reward_vowel_count_async(texts):
await asyncio.sleep(0.5)
vowels = set("aeiouAEIOU")
return [float(sum(ch in vowels for ch in t)) for t in texts]
if __name__ == "__main__":
texts = [
"The quick brown fox",
"ChatGPT is helpful",
"Async rewards can reduce latency",
]
async def asyncio_concurrent(fns, texts: list[str]) -> list[list[float]]:
tasks = [asyncio.create_task(fn(texts)) for fn in fns]
return await asyncio.gather(*tasks)
funcs = [reward_length_async, reward_vowel_count_async]
rewards_per_func = asyncio.run(asyncio_concurrent(funcs, texts))
print(rewards_per_func)The issue is that, is introduces a good amount of complexity, because now we have to
So I'm wondering, to reduce code complexity inside the GRPOTrainer, why not imposing the reward function to be synchronous, and it would be up to the use to call import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
async def reward_length_async(texts):
await asyncio.sleep(0.5)
return [float(len(t)) for t in texts]
async def reward_vowel_count_async(texts):
await asyncio.sleep(0.5)
vowels = set("aeiouAEIOU")
return [float(sum(ch in vowels for ch in t)) for t in texts]
def run_async_in_process(async_fn, texts): # this
return asyncio.run(async_fn(texts))
if __name__ == "__main__":
texts = [
"The quick brown fox",
"ChatGPT is helpful",
"Async rewards can reduce latency",
]
funcs = [reward_length_async, reward_vowel_count_async]
with ProcessPoolExecutor( max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
futures = [executor.submit(run_async_in_process, fn, texts) for fn in funcs]
rewards_per_func = [f.result() for f in futures]
print(rewards_per_func)I know it's not ideal, because async functions are meant for concurrency within a single process. It would probably make it slightly slower (because of IPC overhead, process creation, event loop creation) but it would more simple on the GRPO side |
|
Another option for the user would be to use a single reward function, like this: import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
async def reward_length_async(texts):
await asyncio.sleep(0.5)
return [float(len(t)) for t in texts]
async def reward_vowel_count_async(texts):
await asyncio.sleep(0.5)
vowels = set("aeiouAEIOU")
return [float(sum(ch in vowels for ch in t)) for t in texts]
async def compute_rewards_async(texts):
funcs = [reward_length_async, reward_vowel_count_async]
tasks = [asyncio.create_task(fn(texts)) for fn in funcs]
return await asyncio.gather(*tasks)
def compute_rewards(texts):
rewards = asyncio.run(compute_rewards_async(texts))
combined_rewards = [sum(reward_tuple) for reward_tuple in zip(*rewards)]
return combined_rewards
def _worker_reward(fn, texts):
return fn(texts)
if __name__ == "__main__":
texts = [
"The quick brown fox",
"ChatGPT is helpful",
"Async rewards can reduce latency",
]
funcs = [compute_rewards]
with ProcessPoolExecutor(max_workers=len(funcs), mp_context=multiprocessing.get_context("spawn")) as executor:
futures = [executor.submit(_worker_reward, fn, texts) for fn in funcs]
rewards_per_func = [f.result() for f in futures]
print(rewards_per_func)it will achieve the same speed as asyncio directly, the only drawback is that in the logs, you will get only the mean/std only for this global reward, and not per-reward. |
What does this PR do?
nn.ModuleFixes #4130
Before submitting
Pull Request section?
to it if that's the case.
Who can review?
Anyone in the community is free to review the PR once the tests have passed. Feel free to tag
members/contributors who may be interested in your PR.