Skip to content

Commit

Permalink
DigestMD5 Token Issue Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vikramahuja1001 committed Jun 14, 2024
1 parent 3eb56ec commit 81faa78
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.minikdc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class TestAuthWithDigestMD5 {

private static final Logger LOG = LoggerFactory.getLogger(TestAuthWithDigestMD5.class.getName());
protected static HiveMetaStoreClient client;
public static final String HS2TOKEN = "HiveServer2ImpersonationToken";

protected static String correctUser = "correct_user";
protected static String correctPassword = "correct_passwd";

protected static Configuration conf = null;

private static MiniHiveKdc miniKDC = null;
protected static Configuration clientConf;
protected static String hiveMetastorePrincipal;
protected static String hiveMetastoreKeytab;
private static boolean isServerStarted = false;
protected static int port;

@Before
public void setUp() throws Exception {
miniKDC = new MiniHiveKdc();
hiveMetastorePrincipal =
miniKDC.getFullyQualifiedServicePrincipal(miniKDC.getHiveMetastoreServicePrincipal());
hiveMetastoreKeytab = miniKDC.getKeyTabFile(
miniKDC.getServicePrincipalForUser(miniKDC.getHiveMetastoreServicePrincipal()));

if (null == conf) {
conf = MetastoreConf.newMetastoreConf();
}

MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);
clientConf = new Configuration(conf);

MetastoreConf.setVar(conf, ConfVars.THRIFT_METASTORE_AUTHENTICATION, "CONFIG");
MetastoreConf.setVar(conf, ConfVars.THRIFT_AUTH_CONFIG_USERNAME, correctUser);
MetastoreConf.setVar(conf, ConfVars.THRIFT_AUTH_CONFIG_PASSWORD, correctPassword);
MetastoreConf.setBoolVar(conf, ConfVars.USE_THRIFT_SASL, true);
MetastoreConf.setVar(conf, ConfVars.KERBEROS_PRINCIPAL, hiveMetastorePrincipal);
MetastoreConf.setVar(conf, ConfVars.KERBEROS_KEYTAB_FILE, hiveMetastoreKeytab);

// set some values to use for getting conf. vars
MetastoreConf.setBoolVar(conf, ConfVars.METRICS_ENABLED, true);
conf.set("datanucleus.autoCreateTables", "false");
conf.set("hive.in.test", "true");
MetastoreConf.setVar(conf, ConfVars.METASTORE_METADATA_TRANSFORMER_CLASS, " ");

MetaStoreTestUtils.setConfForStandloneMode(conf);
MetastoreConf.setLongVar(conf, ConfVars.BATCH_RETRIEVE_MAX, 2);
MetastoreConf.setVar(conf, ConfVars.STORAGE_SCHEMA_READER_IMPL, "no.such.class");
MetastoreConf.setBoolVar(conf, ConfVars.INTEGER_JDO_PUSHDOWN, true);
MetastoreConf.setTimeVar(conf, ConfVars.DELEGATION_TOKEN_RENEW_INTERVAL,10000, TimeUnit.MILLISECONDS);
MetastoreConf.setTimeVar(conf, ConfVars.DELEGATION_TOKEN_GC_INTERVAL,3000, TimeUnit.MILLISECONDS);

if (isServerStarted) {
Assert.assertNotNull("Unable to connect to the MetaStore server", client);
return;
}
start();
}

protected void start() throws Exception {
port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
conf);
System.out.println("Starting MetaStore Server on port " + port);
isServerStarted = true;

MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port);
MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);

client = createClient();
}

protected HiveMetaStoreClient createClient() throws Exception {
MetastoreConf.setVar(clientConf, ConfVars.KERBEROS_PRINCIPAL, hiveMetastorePrincipal);
MetastoreConf.setVar(clientConf, ConfVars.KERBEROS_KEYTAB_FILE, hiveMetastoreKeytab);
return new HiveMetaStoreClient(conf);
}

@Test
public void testPostRenewalTimeThreadKickedIn() throws Exception {
String token = client.getDelegationToken("hive","hive");
MetastoreConf.setVar(conf, ConfVars.TOKEN_SIGNATURE, HS2TOKEN);

client = new HiveMetaStoreClient(conf);

Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>();
delegationToken.decodeFromUrlString(token);
delegationToken.setService(new Text(HS2TOKEN));
UserGroupInformation.getCurrentUser().addToken(delegationToken);

LOG.info("Sleeping for 15 seconds");
Thread.sleep(15000);
LOG.info("Waking Up");

client = new HiveMetaStoreClient(conf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,22 @@ public void testDelegationTokenSharedStore() throws Exception {
// expected
}

// token expiration
// token Renewal
MyTokenStore.TOKEN_STORE.addToken(d,
new DelegationTokenInformation(0, t.getPassword()));
Assert.assertNotNull(MyTokenStore.TOKEN_STORE.getToken(d));
anotherManager.removeExpiredTokens();
Assert.assertTrue(MyTokenStore.TOKEN_STORE.getToken(d).getRenewDate() > 0);

// test Expiration
DelegationTokenIdentifier e = new DelegationTokenIdentifier();
e.setMaxDate(0);
MyTokenStore.TOKEN_STORE.addToken(e,
new DelegationTokenInformation(0, t.getPassword()));
Assert.assertNotNull(MyTokenStore.TOKEN_STORE.getToken(d));
anotherManager.removeExpiredTokens();
Assert.assertNull("Expired token not removed",
MyTokenStore.TOKEN_STORE.getToken(d));
MyTokenStore.TOKEN_STORE.getToken(e));

// key expiration - create an already expired key
anotherManager.startThreads(); // generates initial key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +105,10 @@ public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws Inva
if (info == null) {
throw new InvalidToken("token expired or does not exist: " + identifier);
}
renewIfRequired(System.currentTimeMillis(), identifier, info);
// we have to fetch the token again as it has been renewed and info still contains the previous renew time.
info = this.tokenStore.getToken(identifier);

// must reuse super as info.getPassword is not accessible
synchronized (this) {
try {
Expand Down Expand Up @@ -163,8 +169,12 @@ public long renewToken(Token<DelegationTokenIdentifier> token, String renewer) t
try {
long res = super.renewToken(token, renewer);
this.tokenStore.removeToken(id);
this.tokenStore.addToken(id, super.currentTokens.get(id));
DelegationTokenInformation updatedToken = super.currentTokens.get(id);
this.tokenStore.addToken(id, updatedToken);
LOGGER.info("Successfully renewed token : " + id + ", Renewal time now is: " +
Time.formatTime(updatedToken.getRenewDate()));
return res;

} finally {
super.currentTokens.remove(id);
}
Expand Down Expand Up @@ -238,20 +248,33 @@ public synchronized void stopThreads() {
*/
protected void removeExpiredTokens() {
long now = System.currentTimeMillis();
Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
.iterator();
while (i.hasNext()) {
DelegationTokenIdentifier id = i.next();
for (DelegationTokenIdentifier id : tokenStore.getAllDelegationTokenIdentifiers()) {
if (now > id.getMaxDate()) {
LOGGER.info("Expiry Thread removing expired token: " + id);
this.tokenStore.removeToken(id); // no need to look at token info
} else {
// get token info to check renew date
DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
if (tokenInfo != null) {
if (now > tokenInfo.getRenewDate()) {
this.tokenStore.removeToken(id);
}
renewIfRequired(now, id, tokenStore.getToken(id));
}
}
}

private void renewIfRequired(long currentTime, DelegationTokenIdentifier id, DelegationTokenInformation tokenInfo) {
if (tokenInfo != null) {
if (currentTime > tokenInfo.getRenewDate() && currentTime < id.getMaxDate()) {
// This will be the case when now > tokenInfo.getRenewDate() but less than the token expiration/max time.
LOGGER.info("Trying to renew the token: " + id);
try {
DelegationKey key = getDelegationKey(id.getMasterKeyId());
Token<DelegationTokenIdentifier> t = new Token<>(id.getBytes(), createPassword(id.getBytes(), key.getKey()),
id.getKind(), new Text());
renewToken(t, UserGroupInformation.getCurrentUser().getShortUserName());
} catch (IOException e) {
throw new IllegalStateException("Unable to renew token: " + id);
}
} else if (currentTime >= id.getMaxDate()) {
// In this case expiry time has passed and this token cannot be further renewed.
throw new IllegalStateException("Expiration time passed. Cannot renew the token.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ private TokenStoreDelegationTokenSecretManager createTokenMgr(DelegationTokenSto
tokenRenewInterval, tokenGcInterval, tokenStore);
}

private TokenStoreDelegationTokenSecretManager createTokenMgr(DelegationTokenStore tokenStore,
long renewSecs, long gcTime, long maxLifeTime) {
long secretKeyInterval =
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.DELEGATION_KEY_UPDATE_INTERVAL,
TimeUnit.MILLISECONDS);
long tokenMaxLifetime = maxLifeTime * 1000;
long tokenRenewInterval = renewSecs * 1000;
long tokenGcInterval = gcTime * 1000;
return new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
tokenRenewInterval, tokenGcInterval, tokenStore);
}


private DelegationTokenIdentifier getID(String tokenStr) throws IOException {
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
Token<DelegationTokenIdentifier> token = new Token<>();
Expand Down Expand Up @@ -99,19 +112,46 @@ private DelegationTokenIdentifier getID(String tokenStr) throws IOException {

@Test public void testExpiry() throws IOException, InterruptedException {
DelegationTokenStore tokenStore = new MemoryTokenStore();
TokenStoreDelegationTokenSecretManager mgr = createTokenMgr(tokenStore, 1);
TokenStoreDelegationTokenSecretManager mgr = createTokenMgr(tokenStore, 1, 8, 2);
try {
mgr.startThreads();
String tokenStr =
mgr.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(),
UserGroupInformation.getCurrentUser().getShortUserName());
DelegationTokenIdentifier id = getID(tokenStr);
Assert.assertNotNull(mgr.verifyDelegationToken(tokenStr));
// Sleep for the renewal duration
Thread.sleep(1000);
SecretManager.InvalidToken ex = Assert.assertThrows(SecretManager.InvalidToken.class,
// Sleep for the expiration/maxlife duration duration
Thread.sleep(6000);
IllegalStateException ex = Assert.assertThrows(IllegalStateException.class,
() -> mgr.verifyDelegationToken(tokenStr));
Assert.assertTrue(ex.getMessage(), ex.getMessage().contains("has expired"));
Assert.assertTrue(ex.getMessage(), ex.getMessage().contains("Expiration time passed"));
Thread.sleep(6000);
//Expiry thread will remove the token as it has crossed the maxLifeTime.
Assert.assertEquals(tokenStore.getAllDelegationTokenIdentifiers().size(), 0);
} finally {
mgr.stopThreads();
}
}

@Test
public void testExpiryThreadRenewingAndRemovingToken() throws IOException, InterruptedException {
DelegationTokenStore tokenStore = new MemoryTokenStore();
// the idea is to make sure that Expiry thread actually does not remove the token up for renewal
TokenStoreDelegationTokenSecretManager mgr = createTokenMgr(tokenStore, 3, 1, 11);
try {
mgr.startThreads();
String tokenStr =
mgr.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(),
UserGroupInformation.getCurrentUser().getShortUserName());
Assert.assertNotNull(mgr.verifyDelegationToken(tokenStr));
DelegationTokenIdentifier id = getID(tokenStr);
long initialExpiry = tokenStore.getToken(id).getRenewDate();
Thread.sleep(10000);
// Token should automatically get renewed by the Thread as the current time is > renew time.
Assert.assertTrue(tokenStore.getToken(id).getRenewDate() > initialExpiry);
Thread.sleep(10000);
// Token should be automatically removed by the thread as it is expired now.
Assert.assertEquals(tokenStore.getAllDelegationTokenIdentifiers().size(), 0);
} finally {
mgr.stopThreads();
}
Expand Down

0 comments on commit 81faa78

Please sign in to comment.