Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handling up_for_retry task instance states #45070

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

amoghrajesh
Copy link
Contributor


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Comment on lines +375 to +387
def _is_eligible_to_retry(task_instance, task_retries: int):
"""
Is task instance is eligible for retry.
:param task_instance: the task instance
:meta private:
"""
if task_instance.state == State.RESTARTING:
# If a task is RESTARTING state it is always eligible for retry
return True

return task_retries and task_instance.try_number <= task_instance.max_tries
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly a port over of

def _is_eligible_to_retry(*, task_instance: TaskInstance):
"""
Is task instance is eligible for retry.
:param task_instance: the task instance
:meta private:
"""
if task_instance.state == TaskInstanceState.RESTARTING:
# If a task is cleared when running, it goes into RESTARTING state and is always
# eligible for retry
return True
if not getattr(task_instance, "task", None):
# Couldn't load the task, don't know number of retries, guess:
return task_instance.try_number <= task_instance.max_tries
if TYPE_CHECKING:
assert task_instance.task
return task_instance.task.retries and task_instance.try_number <= task_instance.max_tries
because we do not have "task_instance" table entries in SDK anymore.

Tried splitting it too because we do not have "task_instance.task" here

Comment on lines +301 to +314
except (AirflowTaskTimeout, AirflowException):
# Couldn't load the task, don't know number of retries, guess
if not getattr(ti, "task", None):
# Let us set the task_retries to default = 0
msg = RetryTask(
end_date=datetime.now(tz=timezone.utc),
task_retries=0,
)
else:
msg = RetryTask(
end_date=datetime.now(tz=timezone.utc),
# is `or 0` needed?
task_retries=ti.task.retries or 0,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the API and this PR, all we should care about is whether we should retry or not. The task ran, complained that it needs to retry, so we send a retry API call. The core logic of how retry works should be out of the scope of this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant