Skip to content

Commit

Permalink
Merge pull request #1278 from data-integrations/PLUGIN-1647
Browse files Browse the repository at this point in the history
[PLUGIN-1647] fix the access token provider
  • Loading branch information
itsankit-google authored Aug 21, 2023
2 parents aca754d + b81608c commit dc04dc6
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,26 @@
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter;
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputFormat;
import com.google.cloud.hadoop.util.ConfigurationUtil;
import com.google.cloud.hadoop.util.HadoopConfigurationProperty;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -137,6 +141,44 @@ public OutputCommitter createCommitter(TaskAttemptContext context) throws IOExce
return new BigQueryOutputCommitter(context, delegateCommitter);
}

/**
* This method is copied from
* {@link ForwardingBigQueryFileOutputFormat#checkOutputSpecs(JobContext)} to override
* {@link BigQueryFactory} with {@link BigQueryFactoryWithScopes}.
*/
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
Configuration conf = job.getConfiguration();

// Validate the output configuration.
BigQueryOutputConfiguration.validateConfiguration(conf);

// Get the output path.
Path outputPath = BigQueryOutputConfiguration.getGcsOutputPath(conf);
LOG.info("Using output path '%s'.", outputPath);

// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
throw new IOException("The output path '" + outputPath + "' already exists.");
}

// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
throw new IOException("Compression isn't supported for this OutputFormat.");
}

// Error if unable to create a BigQuery helper.
try {
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
}

// Let delegate process its checks.
getDelegate(conf).checkOutputSpecs(job);
}

/**
* BigQuery Output committer.
*/
Expand All @@ -158,7 +200,7 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
BigQueryOutputCommitter(TaskAttemptContext context, OutputCommitter delegate) throws IOException {
super(context, delegate);
try {
BigQueryFactory bigQueryFactory = new BigQueryFactory();
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
} catch (GeneralSecurityException e) {
throw new IOException("Failed to create Bigquery client.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import com.google.api.client.auth.oauth2.Credential;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.common.collect.ImmutableList;
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -43,16 +44,27 @@ public BigQueryFactoryWithScopes(List<String> scopes) {
@Override
public Credential createBigQueryCredential(Configuration config) throws GeneralSecurityException, IOException {
Credential credential =
CredentialFromAccessTokenProviderClassFactory.credential(
config,
Collections.singletonList(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX),
CredentialFromAccessTokenProviderClassFactory.credential(getAccessTokenProvider(config),
scopes);
if (credential != null) {
return credential;
}

return HadoopCredentialConfiguration.getCredentialFactory(
config, String.valueOf(ImmutableList.of(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX)))
.getCredential(BIGQUERY_OAUTH_SCOPES);
.getCredential(scopes);
}

/**
* returns the {@link AccessTokenProvider} that uses the newer GoogleCredentials
* library to get the credentials.
*
* @param config Hadoop {@link Configuration}
* @return {@link ServiceAccountAccessTokenProvider}
*/
private AccessTokenProvider getAccessTokenProvider(Configuration config) {
AccessTokenProvider accessTokenProvider = new ServiceAccountAccessTokenProvider();
accessTokenProvider.setConf(config);
return accessTokenProvider;
}
}
7 changes: 4 additions & 3 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ private static Map<String, String> generateAuthProperties(@Nullable String servi
// AccessTokenProviderClassFromConfigFactory will by default look for
// google.cloud.auth.access.token.provider.impl
// but can be configured to also look for the conf with other prefixes like
// gs.fs.auth.access.token.provider.impl
// fs.gs.auth.access.token.provider.impl
// mapred.bq.auth.access.token.provider.impl
// for use by GCS and BQ.
for (String prefix : prefixes) {
properties.put(prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX,
ServiceAccountAccessTokenProvider.class.getName());
properties.put(
prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX.getKey(),
ServiceAccountAccessTokenProvider.class.getName());
}
return properties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public void refresh() throws IOException {

private GoogleCredentials getCredentials() throws IOException {
if (credentials == null) {
if (conf == null) {
// {@link CredentialFromAccessTokenProviderClassFactory#credential} does not propagate the
// config to {@link ServiceAccountAccessTokenProvider} which causes NPE when
// initializing {@link ForwardingBigQueryFileOutputCommitter because conf is null.
conf = new Configuration();
}
credentials = GCPUtils.loadCredentialsFromConf(conf);
}
return credentials;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.gcs;

import com.google.api.client.auth.oauth2.Credential;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.common.collect.ImmutableList;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;

/**
* Unit Tests for {@link ServiceAccountAccessTokenProvider}.
*/
public class ServiceAccountAccessTokenProviderTest {

@Test
public void testServiceAccountAccessTokenProviderIsUsed() throws IOException {

Map<String, String> authProperties = GCPUtils.generateGCSAuthProperties(null,
"filePath");
Configuration conf = new Configuration();
for (Map.Entry<String, String> prop : authProperties.entrySet()) {
conf.set(prop.getKey(), prop.getValue());
}

AccessTokenProvider accessTokenProvider =
HadoopCredentialConfiguration.getAccessTokenProvider(conf, ImmutableList.of(
GoogleHadoopFileSystemConfiguration.GCS_CONFIG_PREFIX));

Assert.assertTrue(String.format("AccessTokenProvider should be an instance of %s",
ServiceAccountAccessTokenProvider.class.getName()),
accessTokenProvider instanceof ServiceAccountAccessTokenProvider);
}

@Test
public void testServiceAccountAccessTokenProvider() throws IOException {
Map<String, String> authProperties = GCPUtils.generateGCSAuthProperties(null,
"filePath");
Configuration conf = new Configuration();
for (Map.Entry<String, String> prop : authProperties.entrySet()) {
conf.set(prop.getKey(), prop.getValue());
}
// {@link CredentialFromAccessTokenProviderClassFactory#credential} does not propagate the
// config to {@link ServiceAccountAccessTokenProvider} which should not cause NPE
Credential credential = CredentialFromAccessTokenProviderClassFactory.credential(
conf, ImmutableList.of(GoogleHadoopFileSystemConfiguration.GCS_CONFIG_PREFIX),
CredentialFactory.DEFAULT_SCOPES
);
Assert.assertNotNull(credential);
}
}

0 comments on commit dc04dc6

Please sign in to comment.