Skip to content

Commit

Permalink
KAFKA-17415: align create and expire behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Sep 21, 2024
1 parent e34d176 commit c52cb83
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,10 @@ class DelegationTokenManagerZk(config: KafkaConfig,
expireResponseCallback(Errors.NONE, now)
} else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
} else if (now > Long.MaxValue - expireLifeTimeMs) {
expireResponseCallback(Errors.INVALID_TIMESTAMP, tokenInfo.expiryTimestamp())
} else {
//set expiry time stamp
val expiryTimeStamp = Math.min(tokenInfo.maxTimestamp, now + expireLifeTimeMs)
val expiryTimeStamp = if (now > Long.MaxValue - expireLifeTimeMs) Long.MaxValue
else Math.min(tokenInfo.maxTimestamp, now + expireLifeTimeMs)
tokenInfo.setExpiryTimestamp(expiryTimeStamp)

updateToken(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, DelegationTokenExpiredException, DelegationTokenNotFoundException, InvalidRequestException, InvalidTimestampException, TimeoutException, TopicAuthorizationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, DelegationTokenExpiredException, DelegationTokenNotFoundException, InvalidRequestException, TimeoutException, TopicAuthorizationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
Expand Down Expand Up @@ -684,12 +684,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
def testExpireTokenWithOverflowTimestamp(quorum: String): Unit = {
client = createAdminClient
val token = client.createDelegationToken(new CreateDelegationTokenOptions().maxlifeTimeMs(5000)).delegationToken().get()
TestUtils.retry(1000) {
TestUtils.assertFutureExceptionTypeEquals(
client.expireDelegationToken(token.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(Long.MaxValue)).expiryTimestamp(),
classOf[InvalidTimestampException]
)
}
TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token, Long.MaxValue) == Long.MaxValue) }
}

private def expireTokenOrFailWithAssert(token: DelegationToken, expiryTimePeriodMs: Long): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
import static org.apache.kafka.common.protocol.Errors.INVALID_TIMESTAMP;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;

Expand Down Expand Up @@ -323,11 +322,10 @@ public ControllerResult<ExpireDelegationTokenResponseData> expireDelegationToken
setTokenId(myTokenInformation.tokenId()), (short) 0));
} else if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) {
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code());
} else if (now > Long.MAX_VALUE - requestData.expiryTimePeriodMs()) {
return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_TIMESTAMP.code()));
} else {
long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
now + requestData.expiryTimePeriodMs());
} else {
long expiryTimestamp = now > Long.MAX_VALUE - requestData.expiryTimePeriodMs()
? Long.MAX_VALUE
: Math.min(myTokenInformation.maxTimestamp(), now + requestData.expiryTimePeriodMs());

responseData
.setErrorCode(NONE.code())
Expand Down

0 comments on commit c52cb83

Please sign in to comment.