Skip to content

HIVE-28872 Implemented try-with-resources for Closeables #5740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,11 @@ public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) {

public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(
Configuration conf, String userName) throws Exception {
// LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
.getDelegationToken(new Text(userName));
// LOG.info("got "+t);
return t;

// return null;
try (JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class))) {
Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
.getDelegationToken(new Text(userName));
return t;
}
}

public static Token<? extends AbstractDelegationTokenIdentifier> extractThriftToken(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,25 @@ public void setLocation(String location, Job job) throws IOException {
job.getCredentials().addAll(crd);
}
} else {
Job clone = new Job(job.getConfiguration());
HCatInputFormat.setInput(job, dbName, tableName, getPartitionFilterString());

InputJobInfo inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(job.getConfiguration());

SpecialCases.addSpecialCasesParametersForHCatLoader(job.getConfiguration(),
inputJobInfo.getTableInfo());

// We will store all the new /changed properties in the job in the
// udf context, so the the HCatInputFormat.setInput method need not
//be called many times.
for (Entry<String, String> keyValue : job.getConfiguration()) {
String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
udfProps.put(keyValue.getKey(), keyValue.getValue());
try (Job clone = new Job(job.getConfiguration())) {
HCatInputFormat.setInput(job, dbName, tableName, getPartitionFilterString());

InputJobInfo inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(job.getConfiguration());

SpecialCases.addSpecialCasesParametersForHCatLoader(job.getConfiguration(),
inputJobInfo.getTableInfo());

// We will store all the new /changed properties in the job in the
// udf context, so the the HCatInputFormat.setInput method need not
//be called many times.
for (Entry<String, String> keyValue : job.getConfiguration()) {
String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
udfProps.put(keyValue.getKey(), keyValue.getValue());
}
}
}
udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);
udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);

//Store credentials in a private hash map and not the udf context to
// make sure they are not public.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,56 +163,57 @@ public void setStoreLocation(String location, Job job) throws IOException {
job.getCredentials().addAll(crd);
}
} else {
Job clone = new Job(job.getConfiguration());
OutputJobInfo outputJobInfo;
if (userStr.length == 2) {
outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
} else if (userStr.length == 1) {
outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
} else {
throw new FrontendException("location " + location
+ " is invalid. It must be of the form [db.]table",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
if (schema != null) {
pigSchema = schema;
}
if (pigSchema == null) {
throw new FrontendException(
"Schema for data cannot be determined.",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
if (externalLocation != null) {
outputJobInfo.setLocation(externalLocation);
}
try {
HCatOutputFormat.setOutput(job, outputJobInfo);
} catch (HCatException he) {
// pass the message to the user - essentially something about
// the table
// information passed to HCatOutputFormat was not right
throw new PigException(he.getMessage(),
PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job.getConfiguration());
try {
doSchemaValidations(pigSchema, hcatTblSchema);
} catch (HCatException he) {
throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
HCatOutputFormat.setSchema(job, computedSchema);
udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));
try (Job clone = new Job(job.getConfiguration())) {
OutputJobInfo outputJobInfo;
if (userStr.length == 2) {
outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
} else if (userStr.length == 1) {
outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
} else {
throw new FrontendException("location " + location
+ " is invalid. It must be of the form [db.]table",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
if (schema != null) {
pigSchema = schema;
}
if (pigSchema == null) {
throw new FrontendException(
"Schema for data cannot be determined.",
PigHCatUtil.PIG_EXCEPTION_CODE);
}
String externalLocation = udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
if (externalLocation != null) {
outputJobInfo.setLocation(externalLocation);
}
try {
HCatOutputFormat.setOutput(job, outputJobInfo);
} catch (HCatException he) {
// pass the message to the user - essentially something about
// the table
// information passed to HCatOutputFormat was not right
throw new PigException(he.getMessage(),
PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job.getConfiguration());
try {
doSchemaValidations(pigSchema, hcatTblSchema);
} catch (HCatException he) {
throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
HCatOutputFormat.setSchema(job, computedSchema);
udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(computedSchema));

// We will store all the new /changed properties in the job in the
// udf context, so the the HCatOutputFormat.setOutput and setSchema
// methods need not be called many times.
for (Entry<String, String> keyValue : job.getConfiguration()) {
String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
udfProps.put(keyValue.getKey(), keyValue.getValue());
// We will store all the new /changed properties in the job in the
// udf context, so the HCatOutputFormat.setOutput and setSchema
// methods need not be called many times.
for (Entry<String, String> keyValue : job.getConfiguration()) {
String oldValue = clone.getConfiguration().getRaw(keyValue.getKey());
if ((oldValue == null) || (!keyValue.getValue().equals(oldValue))) {
udfProps.put(keyValue.getKey(), keyValue.getValue());
}
}
}
//Store credentials in a private hash map and not the udf context to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ public int run(String[] args) throws IOException, InterruptedException, ClassNot
job.setOutputFormatClass(of.getClass());
job.setNumReduceTasks(0);

JobClient jc = new JobClient(new JobConf(job.getConfiguration()));
try (JobClient jc = new JobClient(new JobConf(job.getConfiguration()))) {

if(UserGroupInformation.isSecurityEnabled()) {
Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
job.getCredentials().addToken(new Text("mr token"), mrdt);
if (UserGroupInformation.isSecurityEnabled()) {
Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
job.getCredentials().addToken(new Text("mr token"), mrdt);
}
}
LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,25 +599,25 @@ private void readControlConfigs(FileSystem fs, Path path) {
void run() throws Exception {
LOG.info("Starting with {}", runOptions);

ForkJoinPool tablePool = new ForkJoinPool(
try (ForkJoinPool tablePool = new ForkJoinPool(
runOptions.tablePoolSize,
new NamedForkJoinWorkerThreadFactory("Table-"),
getUncaughtExceptionHandler(),
false);
false)) {

List<String> databases = null;
List<String> databases = null;

if (controlConfig == null) {
databases = hms.get().getDatabases(runOptions.dbRegex); //TException
} else {
databases = hms.get().getAllDatabases().stream()
.filter(db -> controlConfig.getDatabaseIncludeLists().containsKey(db)).collect(toList());
}
LOG.info("Found {} databases", databases.size());

databases.forEach(dbName -> processDatabase(dbName, tablePool));
LOG.info("Done processing databases.");
if (controlConfig == null) {
databases = hms.get().getDatabases(runOptions.dbRegex); //TException
} else {
databases = hms.get().getAllDatabases().stream()
.filter(db -> controlConfig.getDatabaseIncludeLists().containsKey(db)).collect(toList());
}
LOG.info("Found {} databases", databases.size());

databases.forEach(dbName -> processDatabase(dbName, tablePool));
LOG.info("Done processing databases.");
}
if (failuresEncountered.get()) {
throw new HiveException("One or more failures encountered during processing.");
}
Expand Down
Loading