-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use asyncio for concurrency #111
Conversation
WalkthroughThe pull request introduces a comprehensive refactoring of video generation processes across multiple backend components to support asynchronous execution. The changes transition several video generation tools and agents from synchronous to asynchronous models, enabling non-blocking task execution and improved concurrency. Key modifications include adding Changes
Sequence DiagramsequenceDiagram
participant ComparisonAgent
participant VideoGenerationAgent
participant VideoGenerationTool
ComparisonAgent->>ComparisonAgent: run_tasks()
ComparisonAgent->>VideoGenerationAgent: run_async()
VideoGenerationAgent->>VideoGenerationTool: text_to_video_async()
VideoGenerationTool-->>VideoGenerationAgent: Return video result
VideoGenerationAgent-->>ComparisonAgent: Return agent response
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
backend/director/tools/stabilityai.py (3)
51-53
: New asynchronous method
Adoptingasync def text_to_video_async
is a solid approach. Consider adding a global timeout or cancellation mechanism to prevent indefinite waits if the request stalls.
135-135
: Async sleep
Usingawait asyncio.sleep
is correct for non-blocking delays. If server latency is variable, consider exponential backoff or an alternative retry strategy to optimize performance.
145-146
: Synchronous wrapper withasyncio.run
This wrapper is handy for backward compatibility. However, it may fail if there's already an active event loop (e.g., in some web frameworks). Document or guard against that scenario if necessary.backend/director/agents/comparison.py (1)
59-65
: Taskdone_callback
Using a callback to handle results is a workable approach. Consider wrappingtask.result()
in atry-except
to handle exceptions within tasks more gracefully.backend/director/tools/kling.py (1)
187-187
: Non-blocking wait
await asyncio.sleep
is correct for concurrency. Depending on usage, consider dynamic intervals or backoff strategies.backend/director/agents/video_generation.py (1)
167-168
: Status update before tool call
Pushing status to notify users of the upcoming generation step is a helpful UX pattern.backend/director/tools/fal_video.py (1)
39-45
:text_to_video_async
method
Leveragesfal_client.run_async
for concurrency. Verify thatfal_client
supports large batch operations without performance degradation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/director/agents/comparison.py
(3 hunks)backend/director/agents/video_generation.py
(6 hunks)backend/director/tools/fal_video.py
(4 hunks)backend/director/tools/kling.py
(3 hunks)backend/director/tools/stabilityai.py
(4 hunks)
🔇 Additional comments (19)
backend/director/tools/stabilityai.py (2)
3-3
: Async import usage
Good job importing asyncio
as it is essential for non-blocking operations.
49-49
: Reduced polling interval
Changing from 30 to 10 seconds shortens wait times but can increase the number of requests to the server. Verify the backend can handle more frequent polling without rate-limiting or performance degradation.
backend/director/agents/comparison.py (5)
4-4
: Asynchronous import
Importing asyncio
aligns with the new non-blocking design.
53-56
: Asynchronous _run_video_generation
method
Switching to async
allows concurrency across multiple runs, improving efficiency.
72-80
: run_tasks
concurrency
create_task
and gather
enable parallel executions. Ensure large-scale concurrency doesn’t overload resources or trigger rate limits if many tasks run at once.
96-109
: Initializing videos_content
Populating the VideosContent
structure for each comparison scenario is clear and readable.
112-118
: Blocking the current thread with asyncio.run
Running self.run_tasks(...)
in a blocking fashion is acceptable if there’s no ongoing event loop. If the agent ever needs to run inside an existing loop (e.g., a larger async framework), this could cause issues.
backend/director/tools/kling.py (3)
4-4
: Async import
No issues, fits the async approach.
130-132
: Introduction of text_to_video_async
Adopting an async method is consistent with the overall refactoring. Good job.
190-191
: Synchronous fallback
Wrapping the async call with asyncio.run
preserves backward compatibility. However, using asyncio.run
inside an existing event loop can raise errors.
backend/director/agents/video_generation.py (6)
4-4
: Async import
Continues the consistent approach for concurrency.
94-94
: run_async
definition
Migrating the agent to an async method allows parallel video generations with other tasks.
180-180
: Progress update post-generation
Informs the user when the generation stage completes.
204-205
: Final update and publish
Ensures the UI remains consistent by delivering a conclusive status.
211-212
: Exception handling
Catching exceptions ensures that end users see consistent messaging on errors.
225-226
: Synchronous fallback
Again, if already running in an existing event loop, asyncio.run
may conflict. Document or safeguard usage.
backend/director/tools/fal_video.py (3)
4-4
: Async import
No issues identified.
9-9
: Model enumeration updates
Adding more models is beneficial for flexibility. Be sure to confirm each model’s availability and stability in production.
Also applies to: 15-15, 26-26
59-60
: Blocking wrapper
If this tool is invoked inside an existing event loop, asyncio.run
could lead to a RuntimeError
. Document or handle the scenario as needed.
@0xrohitgarg Getting this error with Fal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
backend/director/tools/fal_video.py (3)
Line range hint
9-26
: LGTM! Consider documenting model capabilities.The model list has been expanded with new options. While the implementation is correct, it would be helpful to add documentation about the capabilities, limitations, or use cases for each model to help users make informed choices.
38-38
: Consider making polling interval configurable.The polling interval is hardcoded. Consider making it configurable through the constructor or method parameters to allow flexibility based on different use cases.
- self.polling_interval = 10 # seconds + def __init__(self, api_key: str, polling_interval: int = 10): + if not api_key: + raise Exception("FAL API key not found") + os.environ["FAL_KEY"] = api_key + self.polling_interval = polling_interval # seconds
78-79
: Add type hints and docstring to the wrapper method.The sync wrapper would benefit from proper type hints and documentation to maintain API clarity.
- def text_to_video(self, *args, **kwargs): + def text_to_video( + self, prompt: str, save_at: str, duration: float, config: dict + ) -> dict[str, str]: + """Synchronous wrapper for text_to_video_async. + + This method provides backward compatibility by wrapping the async implementation. + For better performance, prefer using text_to_video_async directly in async contexts. + """ return asyncio.run(self.text_to_video_async(*args, **kwargs))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/director/tools/fal_video.py
(3 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
backend/director/tools/fal_video.py
74-74: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
🔇 Additional comments (1)
backend/director/tools/fal_video.py (1)
40-76
: Request clarification on reported error.
@ankit-v2-3 reported encountering an error while using this implementation. Could you please provide:
- The complete error message and stack trace
- The specific model and configuration used
- Steps to reproduce the error
This will help ensure the async implementation handles all error cases correctly.
🧰 Tools
🪛 Ruff (0.8.2)
74-74: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
|
||
except Exception as e: | ||
raise Exception(f"Error generating video: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve exception handling and preserve context.
The current implementation loses the original exception context. Use exception chaining to preserve the stack trace.
- except Exception as e:
- raise Exception(f"Error generating video: {str(e)}")
+ except Exception as e:
+ raise Exception(f"Error generating video: {str(e)}") from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
except Exception as e: | |
raise Exception(f"Error generating video: {str(e)}") | |
except Exception as e: | |
raise Exception(f"Error generating video: {str(e)}") from e |
🧰 Tools
🪛 Ruff (0.8.2)
74-74: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
@0xrohitgarg Getting the above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (2)
backend/director/tools/stabilityai.py (1)
Line range hint
51-144
: Convert all synchronous operations to asynchronous.The current implementation mixes synchronous
requests
calls with asynchronoussleep
. This defeats the purpose of async implementation and could lead to blocking operations.Convert all HTTP operations to use
aiohttp
:+ async def text_to_video_async(self, prompt: str, save_at: str, duration: float, config: dict): + async with aiohttp.ClientSession() as session: + # Generate image + headers = {"authorization": f"Bearer {self.api_key}", "accept": "image/*"} + image_payload = { + "prompt": prompt, + "output_format": config.get("format", "png"), + "aspect_ratio": config.get("aspect_ratio", "16:9"), + "negative_prompt": config.get("negative_prompt", ""), + } + + async with session.post(self.image_endpoint, headers=headers, data=image_payload) as image_response: + if image_response.status != 200: + raise Exception(f"Error generating image: {await image_response.text()}") + image_data = await image_response.read() + + # Process image + image = Image.open(io.BytesIO(image_data)) + # ... (rest of image processing) + + # Generate video + video_headers = {"authorization": f"Bearer {self.api_key}"} + # ... (rest of video generation with async calls)backend/director/tools/kling.py (1)
Line range hint
130-187
: Critical: Replace synchronous HTTP calls with aiohttp and add proper error handling.The current implementation has several issues that could lead to the reported "Event loop is closed" error and other potential problems:
- Using synchronous
requests
library within an async function blocks the event loop- No timeout mechanism for the polling loop
- Missing error handling for network issues
- No handling of API rate limits
Here's a suggested fix:
+ import aiohttp + from typing import Optional + from asyncio import TimeoutError async def text_to_video_async( - self, prompt: str, save_at: str, duration: float, config: dict + self, prompt: str, save_at: str, duration: float, config: dict, + timeout: Optional[float] = 3600 # 1 hour default timeout ): api_key = self.get_authorization_token() headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "prompt": prompt, "model": config.get("model", "kling-v1"), "duration": duration, **config, } + async with aiohttp.ClientSession() as session: + try: + async with session.post( + self.video_endpoint, + headers=headers, + json=payload + ) as response: + if response.status != 200: + error_text = await response.text() + raise Exception(f"Error generating video: {error_text}") + + data = await response.json() + job_id = data["data"].get("task_id") + if not job_id: + raise Exception("No task ID returned from the API.") + start_time = time.time() + result_endpoint = f"{self.api_route}/v1/videos/text2video/{job_id}" + while True: + if timeout and (time.time() - start_time) > timeout: + raise TimeoutError( + f"Video generation timed out after {timeout} seconds" + ) + async with session.get( + result_endpoint, + headers={"Authorization": f"Bearer {api_key}"} + ) as response: + response.raise_for_status() + data = await response.json() + status = data["data"]["task_status"] + if status == "succeed": + video_url = data["data"]["task_result"]["videos"][0]["url"] + async with session.get(video_url) as video_response: + video_response.raise_for_status() + with open(save_at, "wb") as f: + f.write(await video_response.read()) + break + elif status == "failed": + raise Exception( + f"Video generation failed: {data['data'].get('error', 'Unknown error')}" + ) + + await asyncio.sleep(self.polling_interval) + except aiohttp.ClientError as e: + raise Exception(f"Network error during video generation: {str(e)}") + except TimeoutError: + raise + except Exception as e: + raise Exception(f"Error during video generation: {str(e)}")This implementation:
- Uses
aiohttp
for non-blocking HTTP requests- Adds a configurable timeout
- Properly handles various error conditions
- Uses a single session for all requests
- Checks for failed status from the API
🧹 Nitpick comments (2)
backend/director/tools/fal_video.py (1)
35-37
: Consider making polling_interval configurable.The polling interval could be made configurable through the constructor or configuration to allow for different use cases and API rate limits.
def __init__(self, api_key: str): if not api_key: raise Exception("FAL API key not found") self.api_key = api_key self.queue_endpoint = "https://queue.fal.run" - self.polling_interval = 10 # seconds + self.polling_interval = config.get("polling_interval", 10) # secondsbackend/director/tools/kling.py (1)
Line range hint
130-191
: Consider these architectural recommendations for the async implementation.As part of the larger async refactoring effort, consider implementing:
- A global timeout strategy across all async operations
- Proper resource cleanup using async context managers
- Structured error handling and logging
- Circuit breakers for external API calls
- Metrics collection for async operations
Would you like me to provide example implementations for any of these recommendations?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/director/agents/comparison.py
(4 hunks)backend/director/tools/fal_video.py
(3 hunks)backend/director/tools/kling.py
(3 hunks)backend/director/tools/stabilityai.py
(4 hunks)backend/requirements.txt
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- backend/requirements.txt
🧰 Additional context used
🪛 Ruff (0.8.2)
backend/director/tools/fal_video.py
84-84: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
🔇 Additional comments (4)
backend/director/tools/fal_video.py (3)
1-2
: LGTM! Appropriate imports for async operations.
The addition of asyncio
and aiohttp
imports is correct for implementing asynchronous HTTP operations.
82-84
:
Improve exception handling with proper chaining.
The current implementation loses the original exception context. Use exception chaining to preserve the stack trace.
except Exception as e:
- raise Exception(f"Error generating video: {str(e)}")
+ raise Exception(f"Error generating video: {str(e)}") from e
Likely invalid or redundant comment.
🧰 Tools
🪛 Ruff (0.8.2)
84-84: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
61-67
: 🛠️ Refactor suggestion
Add timeout and backoff mechanism to polling loop.
The current implementation could potentially poll indefinitely. Consider adding:
- A timeout mechanism to prevent infinite polling
- An exponential backoff strategy to reduce API load
+ async def text_to_video_async(self, prompt: str, save_at: str, duration: float, config: dict, timeout: int = 300):
+ start_time = asyncio.get_event_loop().time()
+ retry_count = 0
+ max_interval = 30
while True:
+ if asyncio.get_event_loop().time() - start_time > timeout:
+ raise TimeoutError(f"Video generation timed out after {timeout} seconds")
if status_json["status"] in ["IN_QUEUE", "IN_PROGRESS"]:
- await asyncio.sleep(self.polling_interval)
+ await asyncio.sleep(min(self.polling_interval * (1.5 ** retry_count), max_interval))
+ retry_count += 1
continue
Likely invalid or redundant comment.
backend/director/tools/kling.py (1)
4-4
: LGTM: Import statement is correctly placed.
The asyncio import is appropriately placed with other standard library imports.
def text_to_video(self, *args, **kwargs): | ||
""" | ||
Blocking call to generate video (synchronous wrapper around the async method). | ||
""" | ||
return asyncio.run(self.text_to_video_async(*args, **kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential "Event loop is closed" error in synchronous wrapper.
The current implementation using asyncio.run()
creates and closes a new event loop for each call. This can cause issues if called from a context where an event loop is already running.
def text_to_video(self, *args, **kwargs):
- return asyncio.run(self.text_to_video_async(*args, **kwargs))
+ loop = asyncio.get_event_loop()
+ if loop.is_closed():
+ return asyncio.run(self.text_to_video_async(*args, **kwargs))
+ return loop.run_until_complete(self.text_to_video_async(*args, **kwargs))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def text_to_video(self, *args, **kwargs): | |
""" | |
Blocking call to generate video (synchronous wrapper around the async method). | |
""" | |
return asyncio.run(self.text_to_video_async(*args, **kwargs)) | |
def text_to_video(self, *args, **kwargs): | |
""" | |
Blocking call to generate video (synchronous wrapper around the async method). | |
""" | |
loop = asyncio.get_event_loop() | |
if loop.is_closed(): | |
return asyncio.run(self.text_to_video_async(*args, **kwargs)) | |
return loop.run_until_complete(self.text_to_video_async(*args, **kwargs)) |
try: | ||
print("At Checkpoint #1", asyncio.get_event_loop().is_closed()) | ||
except Exception as e: | ||
print("No event loop available", e) | ||
asyncio.run(self.run_tasks(video_generation_comparison)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove debug prints and implement proper error handling.
Debug prints should be replaced with proper logging and error handling.
- try:
- print("At Checkpoint #1", asyncio.get_event_loop().is_closed())
- except Exception as e:
- print("No event loop available", e)
+ logger.debug("Initializing video generation tasks")
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
backend/director/core/session.py (1)
82-82
: Document the purpose and usage of the newerror
field.This
error: Optional[str] = None
addition is valuable for capturing errors in theVideoData
model. Consider adding a short docstring or comment to clarify what types of errors should be stored and how they are expected to be handled down the line.backend/director/agents/video_generation.py (2)
120-127
: Maintain uniform state updates.Before and after appending
video_content
toself.output_message
, confirm that partial failures do not leavevideo_content
in an inconsistent state. Consider wrapping these steps in a small try/except block or context manager to clarify the flow.
167-168
: Use a consistent output/interaction pattern.Multiple
push_update()
calls scattered around may introduce race conditions in certain concurrency scenarios if other tasks also modify the message’s state. Consider grouping status updates or queuing them in a single flow so that the message state is updated predictably.Also applies to: 180-180, 204-205, 211-212
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/director/agents/comparison.py
(3 hunks)backend/director/agents/video_generation.py
(7 hunks)backend/director/core/session.py
(1 hunks)
🔇 Additional comments (5)
backend/director/agents/comparison.py (4)
51-54
: Validate error-handling logic in_run_video_generation
.The introduction of an asynchronous call to
video_gen_agent.run_async
is excellent for concurrency. However, ensure that:
- Exceptions in
run_async
that aren’t handled indone_callback
do not get lost.- You can consider a structured error-handling approach (try/except) inside
_run_video_generation
to return consistent error states.
57-63
: Confirm thatdone_callback
handles exceptions gracefully.Any unhandled exception in the async task will surface through
task.result()
. If the exception is not captured, it can crash the process. Ensure that the rest of the system is protected from partial state updates if an exception occurs mid-run.Also applies to: 67-67
111-111
: Avoid usingasyncio.run()
within an existing event loop.Direct usage of
asyncio.run(self.run_tasks(...))
can lead to “Event loop is closed” errors, especially if this method is called from another async context or a running event loop. A safer alternative is:- asyncio.run(self.run_tasks(video_generation_comparison)) + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.run_tasks(video_generation_comparison))
116-118
: Good practice: Log failed videos for clarity.Appending the failed video info to the
agent_response_message
is a great start. You might also want to log these failures with more detail (e.g., stack trace) to facilitate debugging or auditing.backend/director/agents/video_generation.py (1)
94-94
: Commendation for adding an asyncrun_async
method.This asynchronous approach significantly enhances concurrency. Great step!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Summary by CodeRabbit
New Features
text_to_video_async
.Bug Fixes
Documentation
Chores
aiohttp==3.11.11
and removingfal-client==0.5.6
.