Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.BucketGetOption;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
Expand Down Expand Up @@ -186,9 +188,19 @@ public Page<Blob> listBlobs(
}

public SeekableByteChannel open(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.open(path);
}
return delegate.open(path);
}

public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) throws IOException {
if (delegateV2 != null) {
return delegateV2.open(path, options);
}
throw new IOException("GcsUtil V2 not initialized.");
}

/** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
@Deprecated
public WritableByteChannel create(GcsPath path, String type) throws IOException {
Expand Down Expand Up @@ -254,9 +266,20 @@ public CreateOptions build() {
}

public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException {
if (delegateV2 != null) {
delegateV2.create(path, options.delegate);
}
return delegate.create(path, options.delegate);
}

public WritableByteChannel create(
GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException {
if (delegateV2 != null) {
return delegateV2.create(path, options.delegate, writeOptions);
}
throw new IOException("GcsUtil V2 not initialized.");
}

public void verifyBucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) {
delegateV2.verifyBucketAccessible(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import com.google.api.gax.paging.Page;
import com.google.auto.value.AutoValue;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand All @@ -33,6 +35,8 @@
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketGetOption;
import com.google.cloud.storage.Storage.CopyRequest;
Expand All @@ -42,12 +46,17 @@
import com.google.cloud.storage.StorageOptions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -70,6 +79,8 @@ public GcsUtilV2 create(PipelineOptions options) {

private Storage storage;

private final @Nullable Integer uploadBufferSizeBytes;

/** Maximum number of items to retrieve per Objects.List request. */
private static final long MAX_LIST_BLOBS_PER_CALL = 1024;

Expand All @@ -85,13 +96,14 @@ public GcsUtilV2 create(PipelineOptions options) {
GcsUtilV2(PipelineOptions options) {
String projectId = options.as(GcpOptions.class).getProject();
storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes();
}

@SuppressWarnings({
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
// FileAlreadyExistsException with null.
})
private IOException translateStorageException(GcsPath gcsPath, StorageException e) {
private static IOException translateStorageException(GcsPath gcsPath, StorageException e) {
switch (e.getCode()) {
case 403:
return new AccessDeniedException(gcsPath.toString(), null, e.getMessage());
Expand Down Expand Up @@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}

/** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */
private static class GcsSeekableByteChannel implements SeekableByteChannel {
private final ReadChannel reader;
private final long size;
private long position = 0;

GcsSeekableByteChannel(ReadChannel reader, long size) {
this.reader = reader;
this.size = size;
this.position = 0;
}

@Override
public int read(ByteBuffer dst) throws IOException {
int count = reader.read(dst);
if (count > 0) {
this.position += count;
}
return count;
}

@Override
public SeekableByteChannel position(long newPosition) throws IOException {
checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition);
reader.seek(newPosition);
this.position = newPosition;
return this;
}

@Override
public long position() throws IOException {
return this.position;
}

@Override
public long size() throws IOException {
return size;
}

@Override
public SeekableByteChannel truncate(long size) throws IOException {
throw new UnsupportedOperationException(
"GcsSeekableByteChannels are read-only and cannot be truncated.");
}

@Override
public int write(ByteBuffer src) throws IOException {
throw new UnsupportedOperationException(
"GcsSeekableByteChannel are read-only and does not support writing.");
}

@Override
public boolean isOpen() {
return reader.isOpen();
}

@Override
public void close() throws IOException {
if (isOpen()) {
reader.close();
}
}
}

public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions)
throws IOException {
Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
return new GcsSeekableByteChannel(
blob.getStorage().reader(blob.getBlobId(), sourceOptions), blob.getSize());
}

/** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */
private static class GcsWritableByteChannel implements WritableByteChannel {
private final WriteChannel writer;
private final GcsPath gcsPath;

GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
this.writer = writer;
this.gcsPath = gcsPath;
}

@Override
public int write(ByteBuffer src) throws IOException {
try {
return writer.write(src);
} catch (StorageException e) {
throw translateStorageException(gcsPath, e);
}
}

@Override
public boolean isOpen() {
return writer.isOpen();
}

@Override
public void close() throws IOException {
writer.close();
}
}

public WritableByteChannel create(
GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions)
throws IOException {
try {
// Define the metadata for the new object
BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject());
String type = options.getContentType();
if (type != null) {
builder.setContentType(type);
}

BlobInfo blobInfo = builder.build();

List<BlobWriteOption> writeOptionList = new ArrayList<>(Arrays.asList(writeOptions));
if (options.getExpectFileToNotExist()) {
writeOptionList.add(BlobWriteOption.doesNotExist());
}
// Open a WriteChannel from the storage service
WriteChannel writer =
storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0]));
Integer uploadBufferSizeBytes =
options.getUploadBufferSizeBytes() != null
? options.getUploadBufferSizeBytes()
: this.uploadBufferSizeBytes;
if (uploadBufferSizeBytes != null) {
writer.setChunkSize(uploadBufferSizeBytes);
}

// Return the bridge wrapper
return new GcsWritableByteChannel(writer, path);

} catch (StorageException e) {
throw translateStorageException(path, e);
}
}
}
Loading
Loading