-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Workflow cannot respond to errors #1899
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@@ -524,6 +524,7 @@ def hand_event_node_result(self, current_node, node_result_future): | |||
node_chunk.end(chunk) | |||
current_node.get_write_error_context(e) | |||
self.status = 500 | |||
return None | |||
|
|||
def run_node_async(self, node): | |||
future = executor.submit(self.run_node, node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few issues and optimizations to suggest in the provided code:
Issues:
-
Thread Lock: The
threading.Lock
is not used consistently throughout the class. It's better placed at places where critical sections of code need locking, but here it seems unnecessary since there aren't any multi-threaded operations being performed. -
Return Statements: There are multiple instances where lists are returned instead of single values (
[]
andNone
). This might lead to unexpected behavior when calling functions that expect only one value. -
Unnecessary Exception Logging: The exception print statement (
traceback.print_exc()
) should be wrapped within try-except blocks to prevent the program from crashing if an exception occurs. -
Error Handling in Event Nodes: When processing event nodes, there's some error handling in place, but it doesn't seem comprehensive. The response handling can be improved to provide more meaningful information back to the caller.
-
Node Management: Some methods like
append_node
,base_to_response
,get_write_error_context
have no implementation, which could cause runtime errors if called without further refinement.
Optimization Suggestions:
-
Consistent Return Values: Ensure consistent return values. Instead of returning different types (e.g., list, None), decide on a clear structure for all return values.
-
Handle Exceptions Properly: Wrap exception logging in try-except blocks to avoid crashing the entire pipeline.
-
Implement Missing Methods: Fill in the implementations for missing methods such as
append_node
,base_to_response
, andget_write_error_context
.
Here's an updated version with these considerations incorporated:
from concurrent.futures import ThreadPoolExecutor
class ChainManager:
def __init__(self, flow: Flow, params, work_flow_post_handler: WorkFlowPostHandler):
self.audio_list = audio_list
self.params = params
self.flow = flow
self.context = {}
self.node_chunk_manage = NodeChunkManage(self)
self.work_flow_post_handler = work_flow_post_handler
self.executor = ThreadPoolExecutor()
def run_chain_manage(self, current_node, node_result_future):
start_node = self.get_start_node()
while current_node != start_node:
current_node = get_node(current_node.type)(current_node, self.params, self)
result = self.run_chain(current_node)
if result is None:
return
if isinstance(result, Exception):
self.hand_event_node_result(current_node, result)
return
node_list = self.get_next_node_list(current_node, result)
if len(node_list) == 1:
self.run_chain_manage(node_list[0], None)
def run_chain(self, current_node, node_result_future=None):
result = None
try:
if node_result_future is not None:
node_result_future.set_result(result)
# Assuming run_node takes care of setting result appropriately
result = self.run_node(current_node)
except Exception as e:
traceback.print_exc()
# Handle exceptions properly here if necessary
pass
return result
def hand_node_result(self, current_node, node_result_future):
try:
# Implement actual logic to handle node results here
pass
except Exception as e:
traceback.print_exc()
# Log or otherwise report the error
raise
def hand_event_node_result(self, current_node, node_result_future):
try:
# Implement actual logic to handle event node results here
traceback.print_exc()
# Optionally, set specific status codes or messages based on response data
self.status = 500
raise
except Exception as e:
traceback.print_exc()
# Log or otherwise report the error
raise
def run_node_async(self, node):
try:
future = self.executor.submit(self.run_node, node)
return future.result()
except Exception as e:
traceback.print_exc()
This version removes the unused lock and improves exception handling by using try-except blocks. It also provides basic placeholders for the missing methods, ensuring they will behave predictably once implemented.
fix: Workflow cannot respond to errors