Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public Value<MapReduceResult<FilesByShard>> run() {
new HashingSharder(getNumOutputFiles(readers.size())),
GoogleCloudStorageFileOutput.BaseOptions.builder()
.serviceAccountKey(settings.getServiceAccountKey())
.projectId(settings.getProjectId())
.build()
);
output.setContext(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* Sorts a set of keyValues by a lexicographical comparison of the bytes of the key. On beginSlice a
Expand Down Expand Up @@ -132,7 +131,7 @@ private int getStoredSize() {
}

/**
* Re arranges the pointers so that they are ordered according to the order of the corresponding
* Re-arranges the pointers so that they are ordered according to the order of the corresponding
* keys.
*/
private void sortData() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.google.appengine.tools.pipeline.JobInfo;
import com.google.appengine.tools.pipeline.JobInfo.State;

import com.google.appengine.tools.test.CloudStorageExtension;
import lombok.Getter;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -24,6 +26,9 @@
*/
public class CustomOutputTest extends EndToEndTestCase {

@Getter
String bucket;

@SuppressWarnings("serial")
static class CustomWriter extends OutputWriter<Long> {
final int id;
Expand Down Expand Up @@ -98,8 +103,9 @@ public void testOutputInOrder() throws Exception {
.setOutput(new CustomOutput())
.setNumReducers(17);
MapReduceSettings mrSettings = new MapReduceSettings.Builder()
.setServiceAccountKey(getStorageTestHelper().getBase64EncodedServiceAccountKey())
.setBucketName(getStorageTestHelper().getBucket())
.setProjectId(CloudStorageExtension.getProjectId())
.setServiceAccountKey(CloudStorageExtension.getBase64EncodedServiceAccountKey())
.setBucketName(getBucket())
.setDatastoreHost(datastore.getOptions().getHost())
.setProjectId(datastore.getOptions().getProjectId())
.setDatabaseId(datastore.getOptions().getDatabaseId())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import com.google.appengine.tools.mapreduce.reducers.ValueProjectionReducer;
import com.google.appengine.tools.pipeline.JobRunId;
import com.google.appengine.tools.pipeline.JobInfo;
import com.google.appengine.tools.test.CloudStorageExtension;
import com.google.cloud.ReadChannel;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.easymock.EasyMock;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -75,19 +77,21 @@ public class EndToEndTest extends EndToEndTestCase {

private static final Logger log = Logger.getLogger(EndToEndTest.class.getName());


GoogleCloudStorageFileOutput.Options cloudStorageFileOutputOptions;
MapReduceSettings testSettings;

@Getter
String bucket;

@BeforeEach
public void setUp() throws Exception {
super.setUp();
cloudStorageFileOutputOptions = GoogleCloudStorageFileOutput.BaseOptions.defaults()
.withServiceAccountKey(getStorageTestHelper().getBase64EncodedServiceAccountKey())
.withProjectId(getStorageTestHelper().getProjectId()); //prob not really needed ..
.withServiceAccountKey(CloudStorageExtension.getBase64EncodedServiceAccountKey())
.withProjectId(CloudStorageExtension.getProjectId()); //prob not really needed ..
testSettings = new MapReduceSettings.Builder()
.setServiceAccountKey(getStorageTestHelper().getBase64EncodedServiceAccountKey())
.setBucketName(getStorageTestHelper().getBucket())
.setServiceAccountKey(CloudStorageExtension.getBase64EncodedServiceAccountKey())
.setBucketName(bucket)
.build();
}

Expand Down Expand Up @@ -191,7 +195,7 @@ public void testMapOnlyJobWithSizeSegmentedOutput() throws Exception {
String fileNamePattern = "MapOnlySegmentingTestShard-%04d/file-%04d";

SizeSegmentedGoogleCloudStorageFileOutput output =
new SizeSegmentedGoogleCloudStorageFileOutput(getStorageTestHelper().getBucket(), 30, fileNamePattern, mimeType, cloudStorageFileOutputOptions);
new SizeSegmentedGoogleCloudStorageFileOutput(getBucket(), 30, fileNamePattern, mimeType, cloudStorageFileOutputOptions);
MarshallingOutput<String, GoogleCloudStorageFileSet> op =
new MarshallingOutput<>(output, Marshallers.getStringMarshaller());

Expand Down Expand Up @@ -691,7 +695,7 @@ public void testPassThroughToString() throws Exception {
final RandomLongInput input = new RandomLongInput(10, 1);
input.setSeed(0L);
runTest(new MapReduceSpecification.Builder<>(input, new Mod37Mapper(), ValueProjectionReducer
.<String, Long>create(), new StringOutput<>(",", new GoogleCloudStorageFileOutput(getStorageTestHelper().getBucket(), "Foo-%02d", "text/plain", cloudStorageFileOutputOptions)))
.<String, Long>create(), new StringOutput<>(",", new GoogleCloudStorageFileOutput(bucket, "Foo-%02d", "text/plain", cloudStorageFileOutputOptions)))
.setKeyMarshaller(Marshallers.getStringMarshaller())
.setValueMarshaller(Marshallers.getLongMarshaller()).setJobName("TestPassThroughToString")
.build(), new Verifier<GoogleCloudStorageFileSet>() {
Expand All @@ -700,7 +704,7 @@ public void verify(MapReduceResult<GoogleCloudStorageFileSet> result) throws Exc
assertEquals(1, result.getOutputResult().getNumFiles());
assertEquals(10, result.getCounters().getCounter(CounterNames.MAPPER_CALLS).getValue());
GcsFilename file = result.getOutputResult().getFile(0);
ReadChannel channel = getStorageTestHelper().getStorage().reader(file.asBlobId());
ReadChannel channel = storage.reader(file.asBlobId());
BufferedReader reader =
new BufferedReader(Channels.newReader(channel, US_ASCII.newDecoder(), -1));
String line = reader.readLine();
Expand Down Expand Up @@ -737,7 +741,7 @@ private void applyTestPassByteBufferToGcs(boolean sliceRetry) throws Exception {
builder.setKeyMarshaller(Marshallers.getByteBufferMarshaller());
builder.setValueMarshaller(Marshallers.getByteBufferMarshaller());
builder.setReducer(ValueProjectionReducer.<ByteBuffer, ByteBuffer>create());
builder.setOutput(new GoogleCloudStorageFileOutput(getStorageTestHelper().getBucket(), "fileNamePattern-%04d",
builder.setOutput(new GoogleCloudStorageFileOutput(bucket, "fileNamePattern-%04d",
"application/octet-stream", (GoogleCloudStorageFileOutput.Options) cloudStorageFileOutputOptions.withSupportSliceRetries(sliceRetry)));
builder.setNumReducers(2);
runTest(builder.build(), new Verifier<GoogleCloudStorageFileSet>() {
Expand All @@ -748,8 +752,8 @@ public void verify(MapReduceResult<GoogleCloudStorageFileSet> result) throws Exc
ArrayList<Long> results = new ArrayList<>();
ByteBuffer holder = ByteBuffer.allocate(8);
for (GcsFilename file : result.getOutputResult().getFiles()) {
assertEquals("application/octet-stream", getStorageTestHelper().getStorage().get(file.asBlobId()).getContentType());
try (ReadChannel reader = getStorageTestHelper().getStorage().reader(file.asBlobId())) {
assertEquals("application/octet-stream", storage.get(file.asBlobId()).getContentType());
try (ReadChannel reader = storage.reader(file.asBlobId())) {
int read = reader.read(holder);
while (read != -1) {
holder.rewind();
Expand Down Expand Up @@ -1240,7 +1244,7 @@ public void testSideOutput() throws Exception {
final int SHARD_COUNT = 3;

runTest(new MapReduceSpecification.Builder<>(new ConsecutiveLongInput(0, SHARD_COUNT, SHARD_COUNT),
new SideOutputMapper(getStorageTestHelper().getBucket(), cloudStorageFileOutputOptions), KeyProjectionReducer.<GcsFilename, Void>create(),
new SideOutputMapper(bucket, cloudStorageFileOutputOptions), KeyProjectionReducer.<GcsFilename, Void>create(),
new InMemoryOutput<>())
.setKeyMarshaller(Marshallers.getSerializationMarshaller())
.setValueMarshaller(Marshallers.getVoidMarshaller())
Expand All @@ -1260,7 +1264,7 @@ public void verify(MapReduceResult<List<List<GcsFilename>>> result) throws Excep
assertEquals(SHARD_COUNT, files.size());
for (GcsFilename file : files) {
ByteBuffer buf = ByteBuffer.allocate(8);
try (ReadChannel ch = getStorageTestHelper().getStorage().reader(file.asBlobId())) {
try (ReadChannel ch = storage.reader(file.asBlobId())) {
assertEquals(8, ch.read(buf));
assertEquals(-1, ch.read(ByteBuffer.allocate(1)));
}
Expand Down
Loading
Loading