Skip to content

Conversation

@anishgirianish
Copy link
Contributor

@anishgirianish anishgirianish commented Jan 4, 2026

Tasks waiting in Celery queue may have their JWT tokens expire before execution starts. This adds a token refresh endpoint that allows the supervisor to refresh expired tokens before task execution.

Changes:

  • Add /token/refresh endpoint to Execution API
  • Add client-side token refresh logic in supervisor.py
  • Add tests for the new endpoint

Fixes: #53713


Summary

Fixes #59553 - Tasks waiting in Celery queue fail when JWT tokens expire before execution starts.

Implements a two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues.

  • Queue Token: Long-lived token (24h default) sent with task workloads. Can only call /run endpoint.
  • Execution Token: Short-lived token (10min default) issued by /run for subsequent API calls.

Changes

  • Add JWTBearerQueueScope dependency for queue token validation on /run
  • Add TOKEN_SCOPE_QUEUE and generate_queue_token() in tokens.py
  • Update /run endpoint to accept queue tokens and return execution tokens
  • Add jwt_queue_token_expiration_time config option (default: 86400s)
  • Reject queue-scoped tokens on all other endpoints

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:task-sdk labels Jan 4, 2026
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from b183c74 to 9c31417 Compare January 4, 2026 21:05
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch 3 times, most recently from c707ddc to 4ef9dfe Compare January 4, 2026 22:45
@eladkal eladkal added this to the Airflow 3.1.6 milestone Jan 6, 2026
@tirkarthi
Copy link
Contributor

As per my understanding this was removed in #55506 to use a middleware that refreshes token. Are you running an instance with execution api only separately with api-server? Could this middleware approach be extended for task-sdk calls too?

cc: @vincbeck @pierrejeambrun

@anishgirianish
Copy link
Contributor Author

Hi @tirkarthi,
Thanks for pointing out the middleware approach from #55506 - that's helpful context.

I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it.

Totally happy to go with whichever approach the team feels is better!

cc: @vincbeck @pierrejeambrun

@vincbeck
Copy link
Contributor

vincbeck commented Jan 7, 2026

Hi @tirkarthi, Thanks for pointing out the middleware approach from #55506 - that's helpful context.

I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it.

Totally happy to go with whichever approach the team feels is better!

cc: @vincbeck @pierrejeambrun

Would love to hear @ashb or @amoghrajesh 's opinion on this one

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do this approach. It lets any Execution API token be resurrected which fundamentally breaks lots of security assumptions -- it amounts to having tokens not expire. That is bad.

Instead what we should do is generate a new token (i.e. ones with extra/different set of JWT claims) that is only valid for the /run endpoint and valid for longer (say 24hours, make it configurable) and this is what gets sent in the workload.

The run endpoint then would set the header to give the running task a "short lived" token (the one we have right now basically) that is usable on the rest of the Execution API. This approach is safer as the existing controls in the /run endpoint already prevent a task being run one than once, which should also prevent against "resurrecting" an expired token and using it to access things like connections etc. And we should validate that the token used on all endpoints but run is explicitly lacking this new claim.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better approach, and on the right track, thanks.

Some changes though:

  • "queue" is not the right thing to use, as these tokens could be used for executing other workloads soon (for instance we have already talked about wanting Dag level callbacks to be executed on the workers, not in the dag processor, which would be done by having a new type from the ExecuteTaskWorkload).

    so maybe we have "scope": "ExecuteTaskWorkload"?

  • A little bit of refactoring is needed before we are ready to merge this.

Comment on lines 464 to 495
def generate_queue_token(self, sub: str) -> str:
"""
Generate a long-lived queue token for task workloads.
Queue tokens have a special 'scope' claim that restricts them to the /run endpoint only.
They are valid for longer (default 24h) to survive queue wait times.
"""
from airflow.configuration import conf

queue_expiry = conf.getint("execution_api", "jwt_queue_token_expiration_time", fallback=86400)
now = int(datetime.now(tz=timezone.utc).timestamp())

claims = {
"jti": uuid.uuid4().hex,
"iss": self.issuer,
"aud": self.audience,
"nbf": now,
"exp": now + queue_expiry,
"iat": now,
"sub": sub,
"scope": TOKEN_SCOPE_QUEUE,
}

if claims["iss"] is None:
del claims["iss"]
if claims["aud"] is None:
del claims["aud"]

headers = {"alg": self.algorithm}
if self._private_key:
headers["kid"] = self.kid
return jwt.encode(claims, self.signing_arg, algorithm=self.algorithm, headers=headers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new whole function for this -- the existing generate() could work already by doing:

generator.generate({"sub": sub, "exp": now + queue_expiry})

If you think it's worth "packaging" that up, then make it call self.generate -- don't essentially duplicate it.

claims = await validator.avalidated_claims(creds.credentials, validators)

# Reject queue-scoped tokens - they can only be used on /run endpoint
# Only check if scope claim is present (allows backwards compatibility with tests)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this back compat is just for tests then we don't need it.

Comment on lines 314 to 316
# Set a dummy JWT secret so the lifespan can create JWT services without failing.
if not conf.get("api_auth", "jwt_secret", fallback=None):
conf.set("api_auth", "jwt_secret", "in-process-test-secret-key")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never ever ever do this in production/runtime code. The risk of it being picked up and every install in the world having a signing key of "in-process-test-secret-key" is too large.

I knot this is just the InProcess class, but I'm still worried about doing this, doubly so as config is process global.

Comment on lines 332 to 352
# Create a mock container that provides mock JWT services
mock_jwt_generator = MagicMock(spec=JWTGenerator)
mock_jwt_generator.generate.return_value = "mock-execution-token"

mock_jwt_validator = AsyncMock(spec=JWTValidator)
mock_jwt_validator.avalidated_claims.return_value = {"sub": "test", "exp": 9999999999}

class MockContainer:
"""A mock svcs container that returns mock services."""

async def aget(self, svc_type):
if svc_type is JWTGenerator:
return mock_jwt_generator
if svc_type is JWTValidator:
return mock_jwt_validator
raise ValueError(f"Unknown service type: {svc_type}")

async def mock_container_dep():
return MockContainer()

self._app.dependency_overrides[DepContainer.dependency] = mock_container_dep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing here? This looks like test-only code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a mock svcs container? Why not just use a real svcs container?

Comment on lines 375 to 346
# Wait for lifespan to complete before returning the transport
lifespan_started.wait(timeout=5.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather we wrote the JWTBearer and the JWTBearerQueueScope queue in a layered approach (deps can depend on each other, or subclassing) - that way we only have to writ much of the validation once and only tweak the behaviour.

I think the layer approach would be best, so we have a base JWTBearer dep that does the basic validation, but nothing of the presence/absence of the queue scope, and then two deps that consume that returned TIToken to do the next layer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we needed to change this order?

HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for the state transition"},
},
response_model_exclude_unset=True,
dependencies=[JWTBearerQueueDep],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the one and only place we use this dep (and it's also the only place we ever want to use this dep) I think it would be better if we moved JWTBearerQueueDep in to this file.

the /run endpoint, which then issues a short-lived execution token.
This should be set long enough to cover the maximum expected queue wait time.
version_added: 3.1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 3.1.0
version_added: 3.1.7

@ashb ashb self-requested a review January 9, 2026 12:09
@anishgirianish anishgirianish force-pushed the fix/token-expiration-worker branch from e7e3ae1 to e879863 Compare January 9, 2026 23:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ExecuteTask activity token can expire before the task starts running AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT configuration ignored

6 participants