Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Workflow cannot respond to errors #1899

Merged
merged 1 commit into from
Dec 24, 2024

Conversation

shaohuzhang1
Copy link
Contributor

fix: Workflow cannot respond to errors

Copy link

f2c-ci-robot bot commented Dec 24, 2024

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

f2c-ci-robot bot commented Dec 24, 2024

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit 4550f72 into main Dec 24, 2024
4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@main@fix_workflow_respond branch December 24, 2024 08:39
@@ -524,6 +524,7 @@ def hand_event_node_result(self, current_node, node_result_future):
node_chunk.end(chunk)
current_node.get_write_error_context(e)
self.status = 500
return None

def run_node_async(self, node):
future = executor.submit(self.run_node, node)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few issues and optimizations to suggest in the provided code:

Issues:

  1. Thread Lock: The threading.Lock is not used consistently throughout the class. It's better placed at places where critical sections of code need locking, but here it seems unnecessary since there aren't any multi-threaded operations being performed.

  2. Return Statements: There are multiple instances where lists are returned instead of single values ([] and None). This might lead to unexpected behavior when calling functions that expect only one value.

  3. Unnecessary Exception Logging: The exception print statement (traceback.print_exc()) should be wrapped within try-except blocks to prevent the program from crashing if an exception occurs.

  4. Error Handling in Event Nodes: When processing event nodes, there's some error handling in place, but it doesn't seem comprehensive. The response handling can be improved to provide more meaningful information back to the caller.

  5. Node Management: Some methods like append_node, base_to_response, get_write_error_context have no implementation, which could cause runtime errors if called without further refinement.

Optimization Suggestions:

  1. Consistent Return Values: Ensure consistent return values. Instead of returning different types (e.g., list, None), decide on a clear structure for all return values.

  2. Handle Exceptions Properly: Wrap exception logging in try-except blocks to avoid crashing the entire pipeline.

  3. Implement Missing Methods: Fill in the implementations for missing methods such as append_node, base_to_response, and get_write_error_context.

Here's an updated version with these considerations incorporated:

from concurrent.futures import ThreadPoolExecutor

class ChainManager:
    def __init__(self, flow: Flow, params, work_flow_post_handler: WorkFlowPostHandler):
        self.audio_list = audio_list
        self.params = params
        self.flow = flow
        self.context = {}
        self.node_chunk_manage = NodeChunkManage(self)
        self.work_flow_post_handler = work_flow_post_handler
        self.executor = ThreadPoolExecutor()

    def run_chain_manage(self, current_node, node_result_future):
        start_node = self.get_start_node()
        while current_node != start_node:
            current_node = get_node(current_node.type)(current_node, self.params, self)
        result = self.run_chain(current_node)
        if result is None:
            return
        
        if isinstance(result, Exception):
            self.hand_event_node_result(current_node, result)
            return
    
        node_list = self.get_next_node_list(current_node, result)
        if len(node_list) == 1:
            self.run_chain_manage(node_list[0], None)

    def run_chain(self, current_node, node_result_future=None):
        result = None
        try:
            if node_result_future is not None:
                node_result_future.set_result(result)
            
            # Assuming run_node takes care of setting result appropriately
            result = self.run_node(current_node)
            
        except Exception as e:
            traceback.print_exc()
            # Handle exceptions properly here if necessary
            pass
        
        return result

    def hand_node_result(self, current_node, node_result_future):
        try:
            # Implement actual logic to handle node results here
            pass
        except Exception as e:
            traceback.print_exc()
            # Log or otherwise report the error
            raise
    
    def hand_event_node_result(self, current_node, node_result_future):
        try:
            # Implement actual logic to handle event node results here
            traceback.print_exc()
            # Optionally, set specific status codes or messages based on response data
            self.status = 500
            raise
        except Exception as e:
            traceback.print_exc()
            # Log or otherwise report the error
            raise

    def run_node_async(self, node):
        try:
            future = self.executor.submit(self.run_node, node)
            return future.result()
        except Exception as e:
            traceback.print_exc()

This version removes the unused lock and improves exception handling by using try-except blocks. It also provides basic placeholders for the missing methods, ensuring they will behave predictably once implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant