diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index e3f01dd85295..03733463e5c4 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -25,6 +25,8 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketGetOption; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; @@ -186,9 +188,19 @@ public Page listBlobs( } public SeekableByteChannel open(GcsPath path) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path); + } return delegate.open(path); } + public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) throws IOException { + if (delegateV2 != null) { + return delegateV2.open(path, options); + } + throw new IOException("GcsUtil V2 not initialized."); + } + /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */ @Deprecated public WritableByteChannel create(GcsPath path, String type) throws IOException { @@ -254,9 +266,20 @@ public CreateOptions build() { } public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException { + if (delegateV2 != null) { + delegateV2.create(path, options.delegate); + } return delegate.create(path, options.delegate); } + public WritableByteChannel create( + GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException { + if (delegateV2 != null) { + return delegateV2.create(path, options.delegate, writeOptions); + } + throw new IOException("GcsUtil V2 not initialized."); + } + public void verifyBucketAccessible(GcsPath path) throws IOException { if (delegateV2 != null) { delegateV2.verifyBucketAccessible(path); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index b00b7ce0d728..5a422001c572 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -23,6 +23,8 @@ import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -33,6 +35,8 @@ import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -42,12 +46,17 @@ import com.google.cloud.storage.StorageOptions; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; @@ -70,6 +79,8 @@ public GcsUtilV2 create(PipelineOptions options) { private Storage storage; + private final @Nullable Integer uploadBufferSizeBytes; + /** Maximum number of items to retrieve per Objects.List request. */ private static final long MAX_LIST_BLOBS_PER_CALL = 1024; @@ -85,13 +96,14 @@ public GcsUtilV2 create(PipelineOptions options) { GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes(); } @SuppressWarnings({ "nullness" // For Creating AccessDeniedException FileNotFoundException, and // FileAlreadyExistsException with null. }) - private IOException translateStorageException(GcsPath gcsPath, StorageException e) { + private static IOException translateStorageException(GcsPath gcsPath, StorageException e) { switch (e.getCode()) { case 403: return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); @@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException { throw translateStorageException(bucketInfo.getName(), null, e); } } + + /** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */ + private static class GcsSeekableByteChannel implements SeekableByteChannel { + private final ReadChannel reader; + private final long size; + private long position = 0; + + GcsSeekableByteChannel(ReadChannel reader, long size) { + this.reader = reader; + this.size = size; + this.position = 0; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int count = reader.read(dst); + if (count > 0) { + this.position += count; + } + return count; + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition); + reader.seek(newPosition); + this.position = newPosition; + return this; + } + + @Override + public long position() throws IOException { + return this.position; + } + + @Override + public long size() throws IOException { + return size; + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannels are read-only and cannot be truncated."); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException( + "GcsSeekableByteChannel are read-only and does not support writing."); + } + + @Override + public boolean isOpen() { + return reader.isOpen(); + } + + @Override + public void close() throws IOException { + if (isOpen()) { + reader.close(); + } + } + } + + public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) + throws IOException { + Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); + return new GcsSeekableByteChannel( + blob.getStorage().reader(blob.getBlobId(), sourceOptions), blob.getSize()); + } + + /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ + private static class GcsWritableByteChannel implements WritableByteChannel { + private final WriteChannel writer; + private final GcsPath gcsPath; + + GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) { + this.writer = writer; + this.gcsPath = gcsPath; + } + + @Override + public int write(ByteBuffer src) throws IOException { + try { + return writer.write(src); + } catch (StorageException e) { + throw translateStorageException(gcsPath, e); + } + } + + @Override + public boolean isOpen() { + return writer.isOpen(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + } + + public WritableByteChannel create( + GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions) + throws IOException { + try { + // Define the metadata for the new object + BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject()); + String type = options.getContentType(); + if (type != null) { + builder.setContentType(type); + } + + BlobInfo blobInfo = builder.build(); + + List writeOptionList = new ArrayList<>(Arrays.asList(writeOptions)); + if (options.getExpectFileToNotExist()) { + writeOptionList.add(BlobWriteOption.doesNotExist()); + } + // Open a WriteChannel from the storage service + WriteChannel writer = + storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0])); + Integer uploadBufferSizeBytes = + options.getUploadBufferSizeBytes() != null + ? options.getUploadBufferSizeBytes() + : this.uploadBufferSizeBytes; + if (uploadBufferSizeBytes != null) { + writer.setChunkSize(uploadBufferSizeBytes); + } + + // Return the bridge wrapper + return new GcsWritableByteChannel(writer, path); + + } catch (StorageException e) { + throw translateStorageException(path, e); + } + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index 80ffd72924fa..7816ce603001 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -30,13 +30,21 @@ import com.google.cloud.storage.BucketInfo; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; @@ -301,7 +309,8 @@ public void testCreateAndRemoveBucket() throws IOException { } } - private List createTestBucketHelper(String bucketName) throws IOException { + private List createTestBucketHelper(String bucketName, boolean copyData) + throws IOException { final List originPaths = Arrays.asList( GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), @@ -316,16 +325,24 @@ private List createTestBucketHelper(String bucketName) throws IOExcepti if (experiment.equals("use_gcsutil_v2")) { gcsUtil.createBucket(BucketInfo.of(bucketName)); - gcsUtil.copyV2(originPaths, testPaths); + if (copyData) { + gcsUtil.copyV2(originPaths, testPaths); + } else { + return Collections.emptyList(); + } } else { GcsOptions gcsOptions = options.as(GcsOptions.class); gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); - final List originList = - originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - final List testList = - testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); - gcsUtil.copy(originList, testList); + if (copyData) { + final List originList = + originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List testList = + testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + gcsUtil.copy(originList, testList); + } else { + return Collections.emptyList(); + } } return testPaths; @@ -355,7 +372,7 @@ public void testCopy() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List dstPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) @@ -423,7 +440,7 @@ public void testRemove() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List errPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) @@ -485,7 +502,7 @@ public void testRename() throws IOException { final String nonExistentBucket = "my-random-test-bucket-12345"; try { - final List srcPaths = createTestBucketHelper(existingBucket); + final List srcPaths = createTestBucketHelper(existingBucket, true); final List tmpPaths = srcPaths.stream() .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + o.getObject())) @@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws IOException { assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path)); } } + + @Test + public void testRead() throws IOException, NoSuchAlgorithmException { + final GcsPath gcsPath = GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt"); + final String expectedHash = "674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8"; + final long expectedSize = 157283L; + + try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) { + // Verify Size + assertEquals(expectedSize, channel.size()); + assertEquals(0, channel.position()); + + // Read content into ByteBuffer + ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize); + int bytesRead = 0; + while (buffer.hasRemaining()) { + int read = channel.read(buffer); + if (read == -1) { + break; + } + bytesRead += read; + } + + // Verify total bytes read and position + assertEquals(expectedSize, bytesRead); + assertEquals(expectedSize, channel.position()); + + // Flip the buffer to prepare it for reading (sets limit to current position, position to 0) + buffer.flip(); + + // Verify hash + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + digest.update(buffer); + byte[] hashBytes = digest.digest(); + + // Convert bytes to Hex String + StringBuilder sb = new StringBuilder(); + for (byte b : hashBytes) { + sb.append(String.format("%02x", b)); + } + String actualHash = sb.toString(); + + assertEquals("Content hash should match", expectedHash, actualHash); + } + } + + @Test + public void testWriteAndRead() throws IOException { + final String bucketName = "apache-beam-temp-bucket-12345"; + final GcsPath targetPath = GcsPath.fromComponents(bucketName, "test-object.txt"); + final String content = "Hello, GCS!"; + + try { + createTestBucketHelper(bucketName, false); + + // Write content to a GCS file + CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build(); + try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) { + writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + } + + // Read content into a buffer + StringBuilder readContent = new StringBuilder(); + try (ReadableByteChannel reader = gcsUtil.open(targetPath)) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (reader.read(buffer) != -1) { + buffer.flip(); + readContent.append(StandardCharsets.UTF_8.decode(buffer)); + buffer.clear(); + } + } + + // Verify content + assertEquals(content, readContent.toString()); + } finally { + tearDownTestBucketHelper(bucketName); + } + } }