diff --git a/.github/sync-repo-settings.yaml b/.github/sync-repo-settings.yaml index 3a3a5fd9ef..34c710a335 100644 --- a/.github/sync-repo-settings.yaml +++ b/.github/sync-repo-settings.yaml @@ -18,6 +18,7 @@ branchProtectionRules: - OwlBot Post Processor - 'Kokoro - Test: Java GraalVM Native Image' - 'Kokoro - Test: Java 17 GraalVM Native Image' + - javadoc - pattern: 1.113.14-sp isAdminEnforced: true requiredApprovingReviewCount: 1 diff --git a/CHANGELOG.md b/CHANGELOG.md index be85db326f..df49892625 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,35 @@ # Changelog +## [2.27.0](https://github.com/googleapis/java-storage/compare/v2.26.1...v2.27.0) (2023-09-12) + + +### Features + +* Add new JournalingBlobWriteSessionConfig usable with gRPC transport ([#2194](https://github.com/googleapis/java-storage/issues/2194)) ([8880d94](https://github.com/googleapis/java-storage/commit/8880d94c3d1a737dd4492cf66a16ba5e08633a70)) +* Follow-up CLI Improvements ([#2184](https://github.com/googleapis/java-storage/issues/2184)) ([d985976](https://github.com/googleapis/java-storage/commit/d9859768081ea6f872097851d3e318b5bad384d9)) +* Initial CLI for SSB integration and Workload 1 ([#2166](https://github.com/googleapis/java-storage/issues/2166)) ([a349735](https://github.com/googleapis/java-storage/commit/a349735e7fe108e623a330afec0c8cd608ebeef9)) + + +### Bug Fixes + +* A resumable session without a Range header should be interpreted as 0 length ([#2182](https://github.com/googleapis/java-storage/issues/2182)) ([5302201](https://github.com/googleapis/java-storage/commit/53022011d83e6a8515a5ba008fc45fc2dae39cea)) +* Update User-Agent handling for resumable uploads ([#2168](https://github.com/googleapis/java-storage/issues/2168)) ([665b714](https://github.com/googleapis/java-storage/commit/665b714f421d3c13b557d0ff71460c328c010856)) +* Update version resolution logic to be more resilient ([#2169](https://github.com/googleapis/java-storage/issues/2169)) ([c89d275](https://github.com/googleapis/java-storage/commit/c89d27508039a014ea5a6dd8d4889f63d07db73f)) + + +### Dependencies + +* Update actions/checkout action to v4 ([#2188](https://github.com/googleapis/java-storage/issues/2188)) ([c10267e](https://github.com/googleapis/java-storage/commit/c10267e176bda21cd5755dfb0e96d0504fbc1d54)) +* Update actions/checkout action to v4 ([#2189](https://github.com/googleapis/java-storage/issues/2189)) ([5c048c4](https://github.com/googleapis/java-storage/commit/5c048c499eef224dade8f4409dfae732cb5a7017)) +* Update actions/checkout action to v4 ([#2190](https://github.com/googleapis/java-storage/issues/2190)) ([45e66e8](https://github.com/googleapis/java-storage/commit/45e66e89373ef016eff9b7deb30dbdfa818770d2)) +* Update dependency com.google.apis:google-api-services-storage to v1-rev20230710-2.0.0 ([#2162](https://github.com/googleapis/java-storage/issues/2162)) ([73a9f75](https://github.com/googleapis/java-storage/commit/73a9f75d000d2a59cd680fd383a9f9e1b91570cf)) +* Update dependency com.google.apis:google-api-services-storage to v1-rev20230907-2.0.0 ([#2200](https://github.com/googleapis/java-storage/issues/2200)) ([1fa49db](https://github.com/googleapis/java-storage/commit/1fa49db2810f6ffbd46755b4eb1f5efdcf980edb)) +* Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.15.0 ([#2197](https://github.com/googleapis/java-storage/issues/2197)) ([26552f4](https://github.com/googleapis/java-storage/commit/26552f4b78f77d90df4e3dfb829c3f9c092fc817)) +* Update dependency info.picocli:picocli to v4.7.4 ([#2177](https://github.com/googleapis/java-storage/issues/2177)) ([0c90814](https://github.com/googleapis/java-storage/commit/0c908147375fe58ac280179f5fba10bdd3886003)) +* Update dependency info.picocli:picocli to v4.7.5 ([#2183](https://github.com/googleapis/java-storage/issues/2183)) ([f244861](https://github.com/googleapis/java-storage/commit/f2448615ded6d9f43344bf1b9cda7ae3b191223b)) +* Update dependency net.jqwik:jqwik to v1.8.0 ([#2187](https://github.com/googleapis/java-storage/issues/2187)) ([aedbd6a](https://github.com/googleapis/java-storage/commit/aedbd6a811c4fcfedff68d7d46bb68e93bf9eeee)) +* Update dependency org.graalvm.buildtools:native-maven-plugin to v0.9.26 ([#2196](https://github.com/googleapis/java-storage/issues/2196)) ([4f8bb65](https://github.com/googleapis/java-storage/commit/4f8bb658e9ff3cba5e745acae13ec4094a1a48d5)) + ## [2.26.1](https://github.com/googleapis/java-storage/compare/v2.26.0...v2.26.1) (2023-08-14) diff --git a/README.md b/README.md index 93d9ee636c..80765220c3 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-storage - 2.26.0 + 2.26.1 ``` @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-storage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-storage:2.26.0' +implementation 'com.google.cloud:google-cloud-storage:2.26.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.26.0" +libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.26.1" ``` @@ -428,7 +428,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-storage/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-storage.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.26.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.26.1 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/gapic-google-cloud-storage-v2/pom.xml b/gapic-google-cloud-storage-v2/pom.xml index dc6d5f5aba..b2c784d40d 100644 --- a/gapic-google-cloud-storage-v2/pom.xml +++ b/gapic-google-cloud-storage-v2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc gapic-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha gapic-google-cloud-storage-v2 GRPC library for gapic-google-cloud-storage-v2 com.google.cloud google-cloud-storage-parent - 2.26.1 + 2.27.0 diff --git a/google-cloud-storage-bom/pom.xml b/google-cloud-storage-bom/pom.xml index bc33d4137b..795f2f38c7 100644 --- a/google-cloud-storage-bom/pom.xml +++ b/google-cloud-storage-bom/pom.xml @@ -19,7 +19,7 @@ 4.0.0 com.google.cloud google-cloud-storage-bom - 2.26.1 + 2.27.0 pom com.google.cloud @@ -69,22 +69,22 @@ com.google.cloud google-cloud-storage - 2.26.1 + 2.27.0 com.google.api.grpc gapic-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha com.google.api.grpc grpc-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha com.google.api.grpc proto-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml index 8b37a32581..b18bce0b4d 100644 --- a/google-cloud-storage/pom.xml +++ b/google-cloud-storage/pom.xml @@ -2,7 +2,7 @@ 4.0.0 google-cloud-storage - 2.26.1 + 2.27.0 jar Google Cloud Storage https://github.com/googleapis/java-storage @@ -12,11 +12,11 @@ com.google.cloud google-cloud-storage-parent - 2.26.1 + 2.27.0 google-cloud-storage - 1.106.0 + 1.106.2 5.10.0 @@ -173,13 +173,13 @@ com.google.api.grpc proto-google-cloud-kms-v1 - 0.116.0 + 0.117.0 test com.google.cloud google-cloud-kms - 2.25.0 + 2.26.0 test @@ -248,7 +248,7 @@ net.jqwik jqwik - 1.7.4 + 1.8.0 test diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index efaf569a87..76296e9b7d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -21,6 +21,7 @@ import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.nio.channels.WritableByteChannel; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; @@ -28,6 +29,107 @@ /** * Factory class to select and construct {@link BlobWriteSessionConfig}s. * + *

There are several strategies which can be used to upload a {@link Blob} to Google Cloud + * Storage. This class provides factories which allow you to select the appropriate strategy for + * your workload. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Comparison of Strategies
StrategyFactory Method(s)DescriptionRetry SupportTransports SupportedCloud Storage API usedConsiderations
Default (Chunk based upload){@link #getDefault()} + * Buffer up to a configurable amount of bytes in memory, write to Cloud Storage when + * full or close. Buffer size is configurable via + * {@link DefaultBlobWriteSessionConfig#withChunkSize(int)} + * + * Each chunk is retried up to the limitations specified in + * {@link StorageOptions#getRetrySettings()} + * gRPCResumable UploadThe network will only be used for the following operations: + *
    + *
  1. Creating the Resumable Upload Session
  2. + *
  3. Transmitting zero or more incremental chunks
  4. + *
  5. Transmitting the final chunk and finalizing the Resumable Upload Session
  6. + *
  7. + * If any of the above are interrupted with a retryable error, the Resumable Upload Session + * will be queried to reconcile client side state with Cloud Storage + *
  8. + *
+ *
Buffer to disk then upload + *
    + *
  • {@link #bufferToDiskThenUpload(Path)}
  • + *
  • {@link #bufferToDiskThenUpload(Collection) bufferToDiskThenUpload(Collection<Path>)}
  • + *
  • {@link #bufferToTempDirThenUpload()}
  • + *
+ *
+ * Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()} + * upload the entire files contents to Cloud Storage. Delete the temporary file. + * + * Upload the file in the fewest number of RPC possible retrying within the limitations + * specified in {@link StorageOptions#getRetrySettings()} + * gRPCResumable Upload + *
    + *
  1. A Resumable Upload Session will be used to upload the file on disk.
  2. + *
  3. + * If the upload is interrupted with a retryable error, the Resumable Upload Session will + * be queried to restart the upload from Cloud Storage's last received byte + *
  4. + *
+ *
Journal to disk while uploading{@link #journaling(Collection) journaling(Collection<Path>)} + * Create a Resumable Upload Session, before transmitting bytes to Cloud Storage write + * to a recovery file on disk. If the stream to Cloud Storage is interrupted with a + * retryable error query the offset of the Resumable Upload Session, then open the recovery + * file from the offset and transmit the bytes to Cloud Storage. + * gRPCResumable Upload + *
    + *
  1. + * The stream to Cloud Storage will be held open until a) the write is complete + * b) the stream is interrupted + *
  2. + *
  3. + * Because the bytes are journaled to disk, the upload to Cloud Storage can only + * be as fast as the disk. + *
  4. + *
  5. + * The use of Compute + * Engine Local NVMe SSD is strongly encouraged compared to Compute Engine Persistent + * Disk. + *
  6. + *
+ *
+ * * @see BlobWriteSessionConfig * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) @@ -56,11 +158,11 @@ public static DefaultBlobWriteSessionConfig getDefault() { * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object * to a temporary file under {@code java.io.tmpdir}. * - *

Once the file on disk is closed, the entire file will then be uploaded to Google Cloud - * Storage. + *

Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage. * * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException { @@ -72,11 +174,11 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object * to a temporary file under the specified {@code path}. * - *

Once the file on disk is closed, the entire file will then be uploaded to Google Cloud - * Storage. + *

Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage. * * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException { @@ -87,18 +189,34 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object * to a temporary file under one of the specified {@code paths}. * - *

Once the file on disk is closed, the entire file will then be uploaded to Google Cloud - * Storage. + *

Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage. * *

The specifics of how the work is spread across multiple paths is undefined and subject to * change. * * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection paths) throws IOException { return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false); } + + /** + * Create a new {@link BlobWriteSessionConfig} which will journal writes to a temporary file under + * one of the specified {@code paths} before transmitting the bytes to Cloud Storage. + * + *

The specifics of how the work is spread across multiple paths is undefined and subject to + * change. + * + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @since 2.27.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public static JournalingBlobWriteSessionConfig journaling(Collection paths) { + return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java index fb20747a8c..430f30bcd8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java @@ -22,6 +22,7 @@ import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; @@ -96,8 +97,7 @@ WriterFactory createFactory(Clock clock) throws IOException { return new Factory(recoveryFileManager, clock, gcs); } - private RecoveryFileManager.RecoverVolumeSinkFactory getRecoverVolumeSinkFactory( - Clock clock, Duration window) { + private RecoveryVolumeSinkFactory getRecoverVolumeSinkFactory(Clock clock, Duration window) { return path -> { ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock); if (includeLoggingSink) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java index a828523cab..e793bcfee3 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java @@ -16,8 +16,10 @@ package com.google.cloud.storage; +import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.function.Consumer; /** @@ -137,4 +139,19 @@ static int alignSize(int size, int alignmentMultiple) { } // else size is already aligned return alignedSize; } + + static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { + int total = 0; + while (buf.hasRemaining()) { + int read = c.read(buf); + if (read != -1) { + total += read; + } else if (total == 0) { + return -1; + } else { + break; + } + } + return total; + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ChunkSegmenter.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ChunkSegmenter.java index b891bb3bc6..50e3d7e6fa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ChunkSegmenter.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ChunkSegmenter.java @@ -50,6 +50,10 @@ Hasher getHasher() { return hasher; } + ChunkSegment[] segmentBuffer(ByteBuffer bb) { + return segmentBuffers(new ByteBuffer[] {bb}, 0, 1); + } + /** * Given {@code bbs}, yield N segments, where each segment is at most {@code maxSegmentSize} * bytes. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 9b4a39834f..79e3759eaa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -79,7 +79,8 @@ public int getChunkSize() { * *

Default: {@code 16777216 (16 MiB)} * - * @param chunkSize The number of bytes each chunk should be. Must be >= {@code 262144 (256 KiB)} + * @param chunkSize The number of bytes each chunk should be. Must be >= {@code 262144 (256 + * KiB)} * @return The new instance * @see #getChunkSize() * @since 2.26.0 This new api is in preview and is subject to breaking changes. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java index f1b14d063b..cdd964f819 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java @@ -145,7 +145,8 @@ public int write(ByteBuffer src) throws IOException { ByteBuffer slice = src.slice(); Buffers.limit(slice, bufferRemaining); int write = channel.write(slice); - Buffers.position(src, srcPosition + write); + int newPosition = srcPosition + write; + Buffers.position(src, newPosition); bytesConsumed += write; } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 80bca1c572..5aa47eb6f2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -24,12 +24,15 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.storage.ChannelSession.BufferedWriteSession; import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession; import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; import com.google.storage.v2.ServiceConstants.Values; import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; @@ -106,6 +109,10 @@ ResumableUploadBuilder resumable() { return new ResumableUploadBuilder(); } + JournalingResumableUploadBuilder journaling() { + return new JournalingResumableUploadBuilder(); + } + /** * When constructing any of our channel sessions, there is always a {@link * GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction which @@ -332,4 +339,102 @@ BufferedWritableByteChannelSession build() { } } } + + final class JournalingResumableUploadBuilder { + + private RetryingDependencies deps; + private ResultRetryAlgorithm alg; + private BufferHandle bufferHandle; + private BufferHandle recoveryBuffer; + private RecoveryFile recoveryFile; + private UnaryCallable query; + + JournalingResumableUploadBuilder() { + this.deps = RetryingDependencies.attemptOnce(); + this.alg = Retrying.neverRetry(); + } + + JournalingResumableUploadBuilder withRetryConfig( + RetryingDependencies deps, + ResultRetryAlgorithm alg, + UnaryCallable query) { + this.deps = requireNonNull(deps, "deps must be non null"); + this.alg = requireNonNull(alg, "alg must be non null"); + this.query = requireNonNull(query, "query must be non null"); + return this; + } + + JournalingResumableUploadBuilder withBuffer(BufferHandle bufferHandle) { + this.bufferHandle = requireNonNull(bufferHandle, "bufferHandle must be non null"); + return this; + } + + JournalingResumableUploadBuilder withRecoveryBuffer(BufferHandle bufferHandle) { + this.recoveryBuffer = requireNonNull(bufferHandle, "bufferHandle must be non null"); + return this; + } + + JournalingResumableUploadBuilder withRecoveryFile(RecoveryFile recoveryFile) { + this.recoveryFile = requireNonNull(recoveryFile, "recoveryFile must be non null"); + return this; + } + + /** + * Set the Future which will contain the ResumableWrite information necessary to open the Write + * stream. + */ + BuildableJournalingResumableUploadBuilder setStartAsync( + ApiFuture> start) { + requireNonNull(start, "start must be non null"); + return new BuildableJournalingResumableUploadBuilder(start); + } + + final class BuildableJournalingResumableUploadBuilder { + private final ApiFuture> start; + + private BuildableJournalingResumableUploadBuilder(ApiFuture> start) { + this.start = start; + } + + BufferedWritableByteChannelSession build() { + return new BufferedWriteSession<>( + requireNonNull(start, "start must be non null"), + bindFunction() + .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) + .andThen(StorageByteChannels.writable()::createSynchronized)); + } + + private BiFunction< + WriteCtx, + SettableApiFuture, + UnbufferedWritableByteChannel> + bindFunction() { + // it is theoretically possible that the setter methods for the following variables could + // be called again between when this method is invoked and the resulting function is + // invoked. + // To ensure we are using the specified values at the point in time they are bound to the + // function read them into local variables which will be closed over rather than the class + // fields. + RetryingDependencies deps = JournalingResumableUploadBuilder.this.deps; + ResultRetryAlgorithm alg = JournalingResumableUploadBuilder.this.alg; + BufferHandle recoveryBuffer = JournalingResumableUploadBuilder.this.recoveryBuffer; + RecoveryFile recoveryFile = JournalingResumableUploadBuilder.this.recoveryFile; + UnaryCallable query = + JournalingResumableUploadBuilder.this.query; + ByteStringStrategy boundStrategy = byteStringStrategy; + Hasher boundHasher = hasher; + return (writeCtx, resultFuture) -> + new SyncAndUploadUnbufferedWritableByteChannel( + write, + query, + resultFuture, + new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), + deps, + alg, + writeCtx, + recoveryFile, + recoveryBuffer); + } + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 95ddad874f..c97369924a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -1768,7 +1768,7 @@ ReadObjectRequest getReadObjectRequest(BlobId blob, Opts opts) return opts.readObjectRequest().apply(builder).build(); } - private WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts opts) { + WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts opts) { Object object = codecs.blobInfo().encode(info); Object.Builder objectBuilder = object diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index a6aa331350..09dfaf5dc0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -23,7 +23,6 @@ import com.google.api.core.InternalApi; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; -import com.google.api.gax.core.GaxProperties; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GrpcInterceptorProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; @@ -210,9 +209,7 @@ private Tuple> resolveSettingsAndOpts() throw HeaderProvider internalHeaderProvider = StorageSettings.defaultApiClientHeaderProviderBuilder() - .setClientLibToken( - ServiceOptions.getGoogApiClientLibName(), - GaxProperties.getLibraryVersion(this.getClass())) + .setClientLibToken(ServiceOptions.getGoogApiClientLibName(), getLibraryVersion()) .build(); StorageSettings.Builder builder = diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java new file mode 100644 index 0000000000..c8cc0e3dab --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java @@ -0,0 +1,257 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiExceptions; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.ThroughputSink.Record; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.storage.v2.ServiceConstants.Values; +import com.google.storage.v2.WriteObjectResponse; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.stream.Collector; +import javax.annotation.concurrent.Immutable; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +/** + * There are scenarios in which disk space is more plentiful than memory space. This new {@link + * BlobWriteSessionConfig} allows augmenting an instance of storage to produce {@link + * BlobWriteSession}s which will buffer to disk rather than holding things in memory. + * + *

If we have disk available we can checkpoint the contents of an object to disk before + * transmitting to GCS. The checkpointed data on disk allows arbitrary rewind in the case of failure + * but allows the upload to happen as soon as the checkpoint ack is complete. + * + *

Due to the details of how Resumable Upload Sessions are implemented in the GCS gRPC API this + * is possible. However, this approach will not work with the HTTP transports Resumable Upload + * Session spec. + * + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + */ +@Immutable +@BetaApi +@TransportCompatibility(Transport.GRPC) +public final class JournalingBlobWriteSessionConfig extends BlobWriteSessionConfig { + private static final long serialVersionUID = 9059242302276891867L; + + /** + * non-final because of {@link java.io.Serializable}, however this field is effectively final as + * it is immutable and there is not reference mutator method. + */ + @MonotonicNonNull private transient ImmutableList paths; + + private final boolean includeLoggingSink; + + /** Used for {@link java.io.Serializable} */ + @MonotonicNonNull private volatile ArrayList absolutePaths; + + @InternalApi + JournalingBlobWriteSessionConfig(ImmutableList paths, boolean includeLoggingSink) { + this.paths = paths; + this.includeLoggingSink = includeLoggingSink; + } + + @VisibleForTesting + @InternalApi + JournalingBlobWriteSessionConfig withIncludeLoggingSink() { + return new JournalingBlobWriteSessionConfig(paths, true); + } + + @InternalApi + @Override + WriterFactory createFactory(Clock clock) throws IOException { + Duration window = Duration.ofMinutes(10); + RecoveryFileManager recoveryFileManager = + RecoveryFileManager.of(paths, getRecoverVolumeSinkFactory(clock, window)); + ThroughputSink gcs = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock); + gcs = includeLoggingSink ? ThroughputSink.tee(ThroughputSink.logged("gcs", clock), gcs) : gcs; + return new Factory(recoveryFileManager, clock, gcs); + } + + private RecoveryVolumeSinkFactory getRecoverVolumeSinkFactory(Clock clock, Duration window) { + return path -> { + ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock); + if (includeLoggingSink) { + return ThroughputSink.tee( + ThroughputSink.logged(path.toAbsolutePath().toString(), clock), windowed); + } else { + return windowed; + } + }; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + if (absolutePaths == null) { + synchronized (this) { + if (absolutePaths == null) { + absolutePaths = + paths.stream() + .map(Path::toAbsolutePath) + .map(Path::toString) + .collect( + Collector.of( + ArrayList::new, + ArrayList::add, + (left, right) -> { + left.addAll(right); + return left; + })); + } + } + } + out.defaultWriteObject(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.paths = absolutePaths.stream().map(Paths::get).collect(ImmutableList.toImmutableList()); + } + + private static final class Factory implements WriterFactory { + + private final RecoveryFileManager recoveryFileManager; + private final Clock clock; + private final ThroughputSink gcs; + + private Factory(RecoveryFileManager recoveryFileManager, Clock clock, ThroughputSink gcs) { + this.recoveryFileManager = recoveryFileManager; + this.clock = clock; + this.gcs = gcs; + } + + @InternalApi + @Override + public WritableByteChannelSession writeSession( + StorageInternal storage, + BlobInfo info, + Opts opts, + Decoder d) { + if (storage instanceof GrpcStorageImpl) { + GrpcStorageImpl grpcStorage = (GrpcStorageImpl) storage; + RecoveryFile recoveryFile = recoveryFileManager.newRecoveryFile(info); + ApiFuture f = + grpcStorage.startResumableWrite( + GrpcCallContext.createDefault(), grpcStorage.getWriteObjectRequest(info, opts)); + ApiFuture> start = + ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor()); + + BufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(grpcStorage.storageClient.writeObjectCallable()) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.copy()) + .journaling() + .withRetryConfig( + grpcStorage.getOptions(), + grpcStorage.retryAlgorithmManager.idempotent(), + grpcStorage.storageClient.queryWriteStatusCallable()) + .withBuffer(BufferHandle.allocate(Values.MAX_WRITE_CHUNK_BYTES_VALUE)) + .withRecoveryBuffer(BufferHandle.allocate(Values.MAX_WRITE_CHUNK_BYTES_VALUE)) + .withRecoveryFile(recoveryFile) + .setStartAsync(start) + .build(); + + return new JournalingUpload<>(session, start, d); + } else { + return CrossTransportUtils.throwHttpJsonOnly(BlobWriteSessionConfigs.class, "journaling"); + } + } + + private final class JournalingUpload + implements WritableByteChannelSession { + + private final WritableByteChannelSession session; + private final ApiFuture> start; + private final Decoder decoder; + + public JournalingUpload( + WritableByteChannelSession session, + ApiFuture> start, + Decoder decoder) { + this.session = session; + this.start = start; + this.decoder = decoder; + } + + @Override + public ApiFuture openAsync() { + // register a callback on the result future to record our throughput estimate + Instant begin = clock.instant(); + ApiFutures.addCallback( + session.getResult(), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + Instant end = clock.instant(); + // start MUST have completed in order for result to resolve, use the utility method + // to take care of the checked exception handling + WriteCtx writeCtx = + ApiExceptions.callAndTranslateApiException(start); + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + gcs.recordThroughput(Record.of(totalSentBytes, begin, end, true)); + } + + @Override + public void onSuccess(WriteObjectResponse result) { + Instant end = clock.instant(); + long totalSentBytes = -1; + if (result.hasResource()) { + totalSentBytes = result.getResource().getSize(); + } else if (result.hasPersistedSize()) { + totalSentBytes = result.getPersistedSize(); + } + if (totalSentBytes > -1) { + gcs.recordThroughput(Record.of(totalSentBytes, begin, end, false)); + } + } + }, + MoreExecutors.directExecutor()); + return session.openAsync(); + } + + @Override + public ApiFuture getResult() { + return ApiFutures.transform( + session.getResult(), decoder::decode, MoreExecutors.directExecutor()); + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java index 2b6e8d569c..0b6249ca8c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java @@ -65,8 +65,7 @@ enum JsonResumableSessionFailureScenario { BaseServiceException.UNKNOWN_CODE, "dataLoss", "Client side data loss detected. Bytes acked is more than client sent."), - SCENARIO_9(503, "backendNotConnected", "Ack less than bytes sent"), - QUERY_SCENARIO_1(503, "", "Missing Range header in response"); + SCENARIO_9(503, "backendNotConnected", "Ack less than bytes sent"); private static final String PREFIX_I = "\t|< "; private static final String PREFIX_O = "\t|> "; @@ -79,6 +78,7 @@ enum JsonResumableSessionFailureScenario { .or(matches("Content-Type")) .or(matches("Range")) .or(startsWith("X-Goog-Stored-")) + .or(matches("X-Goog-GCS-Idempotency-Token")) .or(matches("X-GUploader-UploadID")); private static final Predicate> includeHeader = diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java index b37d2396d3..5ce0de6fe3 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java @@ -97,8 +97,11 @@ final class JsonResumableSessionQueryTask long endOffset = range.endOffset(); return ResumableOperationResult.incremental(endOffset); } else { - throw JsonResumableSessionFailureScenario.QUERY_SCENARIO_1.toStorageException( - uploadId, response); + // According to + // https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check a 308 + // response that does not contain a Range header should be interpreted as GCS having + // received no data. + return ResumableOperationResult.incremental(0); } } else { HttpResponseException cause = new HttpResponseException(response); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java index d399ea9300..75884657f7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java @@ -19,6 +19,8 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; @@ -33,18 +35,27 @@ * upload recovery in the case an upload is interrupted. */ final class RecoveryFile implements AutoCloseable { - private static final Set writeOps = - ImmutableSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE); + private static final Set writeOpsNew = + ImmutableSet.of( + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING); + private static final Set writeOpsExisting = + ImmutableSet.of( + StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND); private static final Set readOps = ImmutableSet.of(StandardOpenOption.READ); private final Path path; private final ThroughputSink throughputSink; private final Runnable onCloseCallback; + private boolean newFile; + RecoveryFile(Path path, ThroughputSink throughputSink, Runnable onCloseCallback) { this.path = path; this.throughputSink = throughputSink; this.onCloseCallback = onCloseCallback; + this.newFile = true; } public Path getPath() { @@ -52,6 +63,7 @@ public Path getPath() { } public Path touch() throws IOException { + newFile = false; return Files.createFile(path); } @@ -60,12 +72,26 @@ public SeekableByteChannel reader() throws IOException { } public WritableByteChannel writer() throws IOException { - return throughputSink.decorate(Files.newByteChannel(path, writeOps)); + try { + return throughputSink.decorate( + Files.newByteChannel(path, newFile ? writeOpsNew : writeOpsExisting)); + } finally { + newFile = false; + } + } + + public GatheringByteChannel syncingChannel() throws IOException { + try { + return throughputSink.decorate( + new SyncingFileChannel(FileChannel.open(path, newFile ? writeOpsNew : writeOpsExisting))); + } finally { + newFile = false; + } } @Override public void close() throws IOException { - Files.delete(path); + Files.deleteIfExists(path); onCloseCallback.run(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java index 25303b5fa4..4926c899af 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java @@ -66,7 +66,7 @@ static RecoveryFileManager of(List volumes) throws IOException { return of(volumes, p -> ThroughputSink.nullSink()); } - static RecoveryFileManager of(List volumes, RecoverVolumeSinkFactory factory) + static RecoveryFileManager of(List volumes, RecoveryVolumeSinkFactory factory) throws IOException { checkArgument(!volumes.isEmpty(), "At least one volume must be specified"); checkArgument( @@ -86,7 +86,7 @@ static RecoveryFileManager of(List volumes, RecoverVolumeSinkFactory facto } @FunctionalInterface - interface RecoverVolumeSinkFactory { + interface RecoveryVolumeSinkFactory { ThroughputSink apply(Path p); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java index 86f2f92b07..74a6ffb3df 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java @@ -17,6 +17,7 @@ package com.google.cloud.storage; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceDefaults; import com.google.cloud.ServiceOptions; @@ -26,10 +27,38 @@ import com.google.cloud.storage.HttpStorageOptions.HttpStorageRpcFactory; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.spi.StorageRpcFactory; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; public abstract class StorageOptions extends ServiceOptions { private static final long serialVersionUID = -7295846567928013233L; + private static final String VERSION; + + static { + String tmp = "unresolved"; + final Properties props = new Properties(); + try { + String resourcePath = + String.format( + "/META-INF/maven/%s/%s/pom.properties", "com.google.cloud", "google-cloud-storage"); + InputStream resourceAsStream = StorageOptions.class.getResourceAsStream(resourcePath); + if (resourceAsStream == null) { + // some classloaders don't like a leading slash + resourceAsStream = StorageOptions.class.getResourceAsStream(resourcePath.substring(1)); + } + if (resourceAsStream != null) { + props.load(resourceAsStream); + resourceAsStream.close(); + + tmp = props.getProperty("version", "unknown-version"); + } + } catch (IOException ignore) { + // ignored + } + VERSION = tmp; + } /** @deprecated Use {@link HttpStorageFactory} */ @Deprecated @@ -86,6 +115,17 @@ protected boolean projectIdRequired() { return false; } + @Override + public String getLibraryVersion() { + return VERSION; + } + + /* This can break at any time, the value produce is intended to be informative not authoritative */ + @InternalApi + public static String version() { + return VERSION; + } + @SuppressWarnings("unchecked") @Override public abstract StorageOptions.Builder toBuilder(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java new file mode 100644 index 0000000000..4169b8410a --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java @@ -0,0 +1,468 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.ApiExceptions; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import io.grpc.Status.Code; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.util.Arrays; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class SyncAndUploadUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel { + + private final ClientStreamingCallable write; + private final UnaryCallable query; + private final SettableApiFuture resultFuture; + private final ChunkSegmenter chunkSegmenter; + private final WriteCtx writeCtx; + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final RecoveryFile rf; + + private final String uploadId; + private final BufferHandle copyBuffer; + + /* --- running state --- */ + private final RequestStream stream; + + private boolean open; + private @Nullable GatheringByteChannel sync; + private boolean first; + private boolean finished; + + SyncAndUploadUnbufferedWritableByteChannel( + ClientStreamingCallable write, + UnaryCallable query, + SettableApiFuture resultFuture, + ChunkSegmenter chunkSegmenter, + RetryingDependencies deps, + ResultRetryAlgorithm alg, + WriteCtx writeCtx, + RecoveryFile rf, + BufferHandle copyBuffer) { + this.write = + write.withDefaultCallContext( + contextWithBucketName( + writeCtx.getRequestFactory().bucketName(), GrpcCallContext.createDefault())); + this.query = query; + this.resultFuture = resultFuture; + this.chunkSegmenter = chunkSegmenter; + this.writeCtx = writeCtx; + this.deps = deps; + this.alg = new Alg(alg, resultFuture); + this.rf = rf; + this.uploadId = writeCtx.newRequestBuilder().getUploadId(); + this.copyBuffer = copyBuffer; + this.stream = new RequestStream(this.write, resultFuture); + this.open = true; + this.first = true; + this.finished = false; + } + + @SuppressWarnings("resource") + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + if (!open) { + throw new ClosedChannelException(); + } + ByteBuffer[] duplicates = + Arrays.stream(srcs, offset, offset + length) + .map(ByteBuffer::duplicate) + .toArray(ByteBuffer[]::new); + long prevWritten = writeCtx.getTotalSentBytes().get(); + long syncWritten = openSync().write(duplicates); + long goalSize = Math.addExact(prevWritten, syncWritten); + ChunkSegment[] segments = chunkSegmenter.segmentBuffers(srcs, offset, length); + doUpload(false, segments, goalSize); + return syncWritten; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + if (!open) { + return; + } + try { + doUpload(true, new ChunkSegment[0], writeCtx.getTotalSentBytes().get()); + rf.close(); + } finally { + open = false; + } + } + + private GatheringByteChannel openSync() throws IOException { + if (sync == null) { + sync = rf.syncingChannel(); + } + return sync; + } + + private WriteObjectRequest processSegment(ChunkSegment segment) { + WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder(); + if (!first) { + builder.clearUploadId().clearWriteObjectSpec().clearObjectChecksums(); + } else { + first = false; + } + + Crc32cLengthKnown crc32c = segment.getCrc32c(); + ByteString b = segment.getB(); + int contentSize = b.size(); + + // update ctx state that tracks overall progress + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + // resolve current offset and set next + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + builder.setWriteOffset(offset).setChecksummedData(checksummedData.build()); + + if (!segment.isOnlyFullBlocks()) { + finishMessage(builder); + finished = true; + } + + WriteObjectRequest build = builder.build(); + return build; + } + + @NonNull + private WriteObjectRequest finishMessage() { + long offset = writeCtx.getTotalSentBytes().get(); + + WriteObjectRequest.Builder b = writeCtx.newRequestBuilder().setWriteOffset(offset); + + WriteObjectRequest message = finishMessage(b).build(); + return message; + } + + private WriteObjectRequest.Builder finishMessage(WriteObjectRequest.Builder b) { + Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); + b.setFinishWrite(true); + if (crc32cValue != null) { + b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); + } + return b; + } + + private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) { + AtomicBoolean recover = new AtomicBoolean(false); + Retrying.run( + deps, + alg, + () -> { + if (closing && sync != null) { + sync.close(); + } + boolean shouldRecover = recover.getAndSet(true); + if (!shouldRecover) { + for (ChunkSegment segment : segments) { + WriteObjectRequest writeObjectRequest = processSegment(segment); + stream.onNext(writeObjectRequest); + } + + if (closing && !finished) { + WriteObjectRequest message = finishMessage(); + stream.onNext(message); + finished = true; + } + + if (closing) { + stream.onCompleted(); + } + } else { + if (sync != null) { + sync.close(); + sync = null; + } + stream.reset(); + + QueryWriteStatusRequest req = + QueryWriteStatusRequest.newBuilder().setUploadId(uploadId).build(); + QueryWriteStatusResponse resp = query.call(req); + // if the response has a resource the session completed, no need to re-upload + if (!resp.hasResource()) { + long persistedSize = resp.getPersistedSize(); + + if (persistedSize != goalSize) { + + // rewind our context + finished = false; + first = true; + writeCtx.getTotalSentBytes().set(persistedSize); + writeCtx.getConfirmedBytes().set(persistedSize); + writeCtx.getCumulativeCrc32c().set(null); // todo: can we rewind checksum? + + try (SeekableByteChannel reader = rf.reader()) { + reader.position(persistedSize); + ByteBuffer buf = copyBuffer.get(); + while (Buffers.fillFrom(buf, reader) != -1) { + buf.flip(); + while (buf.hasRemaining()) { + ChunkSegment[] recoverySegments = chunkSegmenter.segmentBuffer(buf); + for (ChunkSegment segment : recoverySegments) { + WriteObjectRequest writeObjectRequest = processSegment(segment); + stream.onNext(writeObjectRequest); + } + } + buf.clear(); + } + } + if (closing && !finished) { + WriteObjectRequest message = finishMessage(); + stream.onNext(message); + finished = true; + } + recover.compareAndSet(true, false); + if (closing || finished) { + stream.onCompleted(); + } + } + } else { + Object resource = resp.getResource(); + resultFuture.set(WriteObjectResponse.newBuilder().setResource(resource).build()); + } + } + long newWritten = writeCtx.getTotalSentBytes().get(); + Preconditions.checkState(newWritten == goalSize, "%s == %s", newWritten, goalSize); + return null; + }, + Decoder.identity()); + } + + @VisibleForTesting + static final class RequestStream implements ApiStreamObserver { + private static final ApiException CLIENT_RESET_ERROR = + ApiExceptionFactory.createException(null, GrpcStatusCode.of(Code.ABORTED), false); + + private final ClientStreamingCallable write; + private final SettableApiFuture resultFuture; + + private volatile StreamPair streamPair; + + private RequestStream( + ClientStreamingCallable write, + SettableApiFuture resultFuture) { + this.write = write; + this.resultFuture = resultFuture; + } + + @Override + public void onNext(WriteObjectRequest value) { + StreamPair pair = ensureOpen(); + Throwable err = pair.getResponseStream().error; + if (err != null) { + reset(); + throw StorageException.coalesce(err); + } + + halfClosedToUnavailable(() -> pair.getRequestStream().onNext(value)); + } + + @Override + public void onError(Throwable t) { + try { + halfClosedToUnavailable(() -> ensureOpen().getRequestStream().onError(t)); + } finally { + streamPair = null; + } + } + + @Override + public void onCompleted() { + StreamPair pair = ensureOpen(); + Throwable err = pair.getResponseStream().error; + if (err != null) { + reset(); + throw StorageException.coalesce(err); + } + + halfClosedToUnavailable(pair.getRequestStream()::onCompleted); + pair.getResponseStream().await(); + } + + private StreamPair ensureOpen() { + if (streamPair == null) { + ResponseStream responseStream = new ResponseStream(resultFuture); + ApiStreamObserver requestStream = + write.clientStreamingCall(responseStream); + streamPair = new StreamPair(requestStream, responseStream); + } + return streamPair; + } + + private void reset() { + if (streamPair != null && streamPair.getRequestStream() != null) { + streamPair.getRequestStream().onError(CLIENT_RESET_ERROR); + } + streamPair = null; + } + + /** + * If the stream is in the process of closing (usually due to error) and we call a method on it + * we will receive an IllegalStateException. A stream being half closed is not a terminal state + * for our upload operation. Attempt to detect and translate it into an UNAVAILABLE error we can + * retry. + */ + static void halfClosedToUnavailable(Runnable r) { + try { + r.run(); + } catch (IllegalStateException ise) { + String message = ise.getMessage(); + if (message != null && message.contains("half-closed")) { + throw ApiExceptionFactory.createException(ise, GrpcStatusCode.of(Code.UNAVAILABLE), true); + } else { + throw ise; + } + } + } + } + + @VisibleForTesting + static final class ResponseStream implements ApiStreamObserver { + private final SettableApiFuture invocationHandle; + private final SettableApiFuture resultFuture; + + private volatile WriteObjectResponse last; + private volatile Throwable error; + + @VisibleForTesting + ResponseStream(SettableApiFuture resultFuture) { + this.resultFuture = resultFuture; + this.invocationHandle = SettableApiFuture.create(); + } + + @Override + public void onNext(WriteObjectResponse value) { + last = value; + error = null; + } + + @Override + public void onError(Throwable t) { + error = t; + invocationHandle.setException(t); + } + + @Override + public void onCompleted() { + if (last != null && last.hasResource()) { + resultFuture.set(last); + } + invocationHandle.set(null); + } + + void await() { + ApiExceptions.callAndTranslateApiException(invocationHandle); + } + } + + @VisibleForTesting + static final class Alg implements ResultRetryAlgorithm { + + private final ResultRetryAlgorithm delegate; + private final SettableApiFuture resultFuture; + + @VisibleForTesting + @SuppressWarnings("unchecked") + Alg(ResultRetryAlgorithm delegate, SettableApiFuture resultFuture) { + this.delegate = (ResultRetryAlgorithm) delegate; + this.resultFuture = resultFuture; + } + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, + WriteObjectResponse prevResponse, + TimedAttemptSettings prevSettings) { + return delegate.createNextAttempt(prevThrowable, prevResponse, prevSettings); + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, WriteObjectResponse prevResponse) + throws CancellationException { + boolean shouldRetry = delegate.shouldRetry(prevThrowable, prevResponse); + if (!shouldRetry && prevThrowable != null) { + resultFuture.setException(prevThrowable); + } + return shouldRetry; + } + } + + private static final class StreamPair { + private final ApiStreamObserver requestStream; + private final ResponseStream responseStream; + + private StreamPair( + ApiStreamObserver requestStream, ResponseStream responseStream) { + this.requestStream = requestStream; + this.responseStream = responseStream; + } + + public ApiStreamObserver getRequestStream() { + return requestStream; + } + + public ResponseStream getResponseStream() { + return responseStream; + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java new file mode 100644 index 0000000000..74c76ecac0 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncingFileChannel.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** A FileChannel decorator that will fsync after every {@link #write(ByteBuffer)} */ +final class SyncingFileChannel implements UnbufferedWritableByteChannel { + + private final FileChannel fc; + + SyncingFileChannel(FileChannel fc) { + this.fc = fc; + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + long written = fc.write(srcs, offset, length); + // metadata in this case are things like mtime, atime etc. Those are not important to our needs + // simply force the file contents to by synced. + fc.force(/*includeMetaData = */ false); + return written; + } + + @Override + public boolean isOpen() { + return fc.isOpen(); + } + + @Override + public void close() throws IOException { + fc.close(); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java index 776629cf34..bf9d523b35 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java @@ -18,11 +18,14 @@ import com.google.common.base.MoreObjects; import java.io.IOException; +import java.nio.Buffer; import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Objects; import java.util.logging.Logger; @@ -36,6 +39,8 @@ interface ThroughputSink { WritableByteChannel decorate(WritableByteChannel wbc); + GatheringByteChannel decorate(GatheringByteChannel gbc); + static void computeThroughput(Clock clock, ThroughputSink sink, long numBytes, IO io) throws IOException { boolean exception = false; @@ -171,6 +176,11 @@ public WritableByteChannel decorate(WritableByteChannel wbc) { return new ThroughputRecordingWritableByteChannel(wbc, this, clock); } + @Override + public GatheringByteChannel decorate(GatheringByteChannel gbc) { + return new ThroughputRecordingGatheringByteChannel(gbc, this, clock); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("prefix", prefix).add("clock", clock).toString(); @@ -191,6 +201,30 @@ private ThroughputRecordingWritableByteChannel( @Override public int write(ByteBuffer src) throws IOException { + return ThroughputRecordingWritableByteChannel.write(src, clock, delegate, sink); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("delegate", delegate) + .add("sink", sink) + .add("clock", clock) + .toString(); + } + + static int write(ByteBuffer src, Clock clock, WritableByteChannel delegate, ThroughputSink sink) + throws IOException { boolean exception = false; int remaining = src.remaining(); Instant begin = clock.instant(); @@ -205,6 +239,60 @@ public int write(ByteBuffer src) throws IOException { sink.recordThroughput(record); } } + } + + final class ThroughputRecordingGatheringByteChannel implements GatheringByteChannel { + private final GatheringByteChannel delegate; + private final ThroughputSink sink; + private final Clock clock; + + private ThroughputRecordingGatheringByteChannel( + GatheringByteChannel delegate, ThroughputSink sink, Clock clock) { + this.delegate = delegate; + this.sink = sink; + this.clock = clock; + } + + @Override + public int write(ByteBuffer src) throws IOException { + return ThroughputRecordingWritableByteChannel.write(src, clock, delegate, sink); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + boolean exception = false; + long available = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + Instant begin = clock.instant(); + try { + return delegate.write(srcs, offset, length); + } catch (IOException e) { + exception = true; + throw e; + } finally { + Instant end = clock.instant(); + long remaining = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + Record record = Record.of(available - remaining, begin, end, exception); + sink.recordThroughput(record); + } + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException { + boolean exception = false; + long available = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + Instant begin = clock.instant(); + try { + return delegate.write(srcs); + } catch (IOException e) { + exception = true; + throw e; + } finally { + Instant end = clock.instant(); + long remaining = Arrays.stream(srcs).mapToLong(Buffer::remaining).sum(); + Record record = Record.of(available - remaining, begin, end, exception); + sink.recordThroughput(record); + } + } @Override public boolean isOpen() { @@ -246,6 +334,11 @@ public WritableByteChannel decorate(WritableByteChannel wbc) { return b.decorate(a.decorate(wbc)); } + @Override + public GatheringByteChannel decorate(GatheringByteChannel gbc) { + return b.decorate(a.decorate(gbc)); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("a", a).add("b", b).toString(); @@ -271,6 +364,11 @@ public WritableByteChannel decorate(WritableByteChannel wbc) { return new ThroughputRecordingWritableByteChannel(wbc, this, clock); } + @Override + public GatheringByteChannel decorate(GatheringByteChannel gbc) { + return new ThroughputRecordingGatheringByteChannel(gbc, this, clock); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("w", w).add("clock", clock).toString(); @@ -289,5 +387,10 @@ public void recordThroughput(Record r) {} public WritableByteChannel decorate(WritableByteChannel wbc) { return wbc; } + + @Override + public GatheringByteChannel decorate(GatheringByteChannel gbc) { + return gbc; + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java index 2488cfcb34..654811f411 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java @@ -40,6 +40,10 @@ final class WriteCtx { this.cumulativeCrc32c = new AtomicReference<>(); } + public RequestFactoryT getRequestFactory() { + return requestFactory; + } + public WriteObjectRequest.Builder newRequestBuilder() { return requestFactory.newBuilder(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java index b77128d89e..806c2b42c0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java @@ -86,8 +86,7 @@ static FlusherFactory fsyncOnClose( new FsyncOnClose(write, bucketName, committedTotalBytesCallback, onSuccessCallback); } - private static GrpcCallContext contextWithBucketName( - String bucketName, GrpcCallContext baseContext) { + static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext baseContext) { if (bucketName != null && !bucketName.isEmpty()) { return baseContext.withExtraHeaders( ImmutableMap.of( @@ -257,7 +256,7 @@ private void ensureOpen() { } } - private static class Observer implements ApiStreamObserver { + static class Observer implements ApiStreamObserver { private final LongConsumer sizeCallback; private final Consumer completeCallback; @@ -265,7 +264,7 @@ private static class Observer implements ApiStreamObserver private final SettableApiFuture invocationHandle; private volatile WriteObjectResponse last; - private Observer(LongConsumer sizeCallback, Consumer completeCallback) { + Observer(LongConsumer sizeCallback, Consumer completeCallback) { this.sizeCallback = sizeCallback; this.completeCallback = completeCallback; this.invocationHandle = SettableApiFuture.create(); @@ -304,7 +303,7 @@ public void onCompleted() { invocationHandle.set(null); } - private void await() { + void await() { try { invocationHandle.get(); } catch (InterruptedException | ExecutionException e) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 98d9476f89..eb5b8e1ba6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -123,14 +123,15 @@ public HttpStorageRpc(StorageOptions options, JsonFactory jsonFactory) { this.options = options; // Open Census initialization + String applicationName = options.getApplicationName(); CensusHttpModule censusHttpModule = new CensusHttpModule(tracer, IS_RECORD_EVENTS); initializer = censusHttpModule.getHttpRequestInitializer(initializer); - initializer = new InvocationIdInitializer(initializer); + initializer = new InvocationIdInitializer(initializer, applicationName); batchRequestInitializer = censusHttpModule.getHttpRequestInitializer(null); storage = new Storage.Builder(transport, jsonFactory, initializer) .setRootUrl(options.getHost()) - .setApplicationName(options.getApplicationName()) + .setApplicationName(applicationName) .build(); } @@ -140,9 +141,12 @@ public Storage getStorage() { private static final class InvocationIdInitializer implements HttpRequestInitializer { @Nullable HttpRequestInitializer initializer; + @Nullable private final String applicationName; - private InvocationIdInitializer(@Nullable HttpRequestInitializer initializer) { + private InvocationIdInitializer( + @Nullable HttpRequestInitializer initializer, @Nullable String applicationName) { this.initializer = initializer; + this.applicationName = applicationName; } @Override @@ -151,15 +155,19 @@ public void initialize(HttpRequest request) throws IOException { if (this.initializer != null) { this.initializer.initialize(request); } - request.setInterceptor(new InvocationIdInterceptor(request.getInterceptor())); + request.setInterceptor( + new InvocationIdInterceptor(request.getInterceptor(), applicationName)); } } private static final class InvocationIdInterceptor implements HttpExecuteInterceptor { - @Nullable HttpExecuteInterceptor interceptor; + @Nullable private final HttpExecuteInterceptor interceptor; + @Nullable private final String applicationName; - private InvocationIdInterceptor(@Nullable HttpExecuteInterceptor interceptor) { + private InvocationIdInterceptor( + @Nullable HttpExecuteInterceptor interceptor, @Nullable String applicationName) { this.interceptor = interceptor; + this.applicationName = applicationName; } @Override @@ -183,6 +191,13 @@ public void intercept(HttpRequest request) throws IOException { } headers.set("x-goog-api-client", newValue); headers.set("x-goog-gcs-idempotency-token", invocationId); + + String userAgent = headers.getUserAgent(); + if ((userAgent == null + || userAgent.isEmpty() + || (applicationName != null && !userAgent.contains(applicationName)))) { + headers.setUserAgent(applicationName); + } } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 3d5845cf55..2807d7923f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.ListenableFutureToApiFuture; -import com.google.api.gax.core.GaxProperties; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -45,8 +44,7 @@ final class TransferManagerImpl implements TransferManager { private static final String USER_AGENT_ENTRY = "gcloud-tm/"; - private static final String LIBRARY_VERSION = - GaxProperties.getLibraryVersion(TransferManagerConfig.class); + private static final String LIBRARY_VERSION = StorageOptions.version(); private final TransferManagerConfig transferManagerConfig; private final ListeningExecutorService executor; private final Qos qos; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionQueryTaskTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionQueryTaskTest.java index 07a04ed61e..d11cc340ec 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionQueryTaskTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionQueryTaskTest.java @@ -141,10 +141,6 @@ public void incompleteSession() throws Exception { } } - /** - * This is a hard failure from the perspective of GCS as a range header is a required header to be - * included in the response to a query upload request. - */ @Test public void incompleteSession_missingRangeHeader() throws Exception { HttpRequestHandler handler = @@ -156,9 +152,9 @@ public void incompleteSession_missingRangeHeader() throws Exception { JsonResumableSessionQueryTask task = new JsonResumableSessionQueryTask(httpClientContext, uploadUrl); - StorageException se = assertThrows(StorageException.class, task::call); - assertThat(se.getCode()).isEqualTo(503); - assertThat(se).hasMessageThat().contains("Range"); + ResumableOperationResult<@Nullable StorageObject> result = task.call(); + assertThat(result.getPersistedSize()).isEqualTo(0); + assertThat(result.getObject()).isNull(); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java new file mode 100644 index 0000000000..5fcd6429d8 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncingFileChannelTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.RewindableContentPropertyTest.byteBuffers; +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; +import static java.nio.file.Files.readAllBytes; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import net.jqwik.api.Arbitraries; +import net.jqwik.api.Arbitrary; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; +import net.jqwik.api.Provide; + +public final class ITSyncingFileChannelTest { + + /** + * Run a series of generated scenarios where each write is performed against a {@link + * SyncingFileChannel} after {@link SyncingFileChannel#write(ByteBuffer)} returns verify the full + * contents of the file match the expected cumulative value. + */ + @Property + void shouldHandleAnySizeWriteGt0(@ForAll("WriteScenario") WriteScenario writeScenario) + throws IOException { + // use try-with-resource to approximate @TearDown and cleanup the file + try (WriteScenario ws = writeScenario) { + Path path = ws.getPath(); + try (FileChannel fc = FileChannel.open(path, ws.getOpenOptions()); + SyncingFileChannel syncing = new SyncingFileChannel(fc)) { + assertThat(syncing.isOpen()).isTrue(); + ByteBuffer[] writes = ws.writes(); + for (int i = 0; i < writes.length; i++) { + ByteBuffer buf = writes[i]; + syncing.write(buf); + assertThat(xxd(readAllBytes(path))).isEqualTo(ws.expected(i)); + } + } + assertThat(xxd(readAllBytes(path))).isEqualTo(ws.all()); + } + } + + @Provide("WriteScenario") + static Arbitrary writeScenario() { + return Arbitraries.lazyOf( + () -> + Arbitraries.oneOf( + byteBuffers(1, 10), + byteBuffers(10, 100), + byteBuffers(100, 1_000), + byteBuffers(1_000, 10_000), + byteBuffers(10_000, 100_000), + byteBuffers(100_000, 1_000_000))) + .map( + buffers -> + Arrays.stream(buffers).filter(Buffer::hasRemaining).toArray(ByteBuffer[]::new)) + .filter( + buffers -> { + long totalAvailable = Arrays.stream(buffers).mapToLong(ByteBuffer::remaining).sum(); + return totalAvailable > 0; + }) + .map(WriteScenario::of); + } + + static final class WriteScenario implements AutoCloseable { + private static final Path TMP_DIR = Paths.get(System.getProperty("java.io.tmpdir")); + private static final Collector DEBUG_JOINER = + Collectors.joining(",\n\t", "[\n\t", "\n]"); + + private final Path path; + private final ByteBuffer[] writes; + private final ByteString[] expectedCumulativeContents; + private final EnumSet openOptions; + + private WriteScenario(Path path, ByteBuffer[] writes, ByteString[] expectedCumulativeContents) { + this.path = path; + this.writes = writes; + this.expectedCumulativeContents = expectedCumulativeContents; + this.openOptions = EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE); + } + + public Path getPath() { + return path; + } + + public EnumSet getOpenOptions() { + return openOptions; + } + + ByteBuffer[] writes() { + return Arrays.stream(writes).map(ByteBuffer::duplicate).toArray(ByteBuffer[]::new); + } + + String expected(int idx) { + Preconditions.checkArgument( + 0 <= idx && idx < expectedCumulativeContents.length, + "index out of bounds: (0 <= %s && %s < %s)", + idx, + idx, + expectedCumulativeContents.length); + return xxd(false, expectedCumulativeContents[idx].asReadOnlyByteBuffer()); + } + + String all() { + return xxd( + false, + expectedCumulativeContents[expectedCumulativeContents.length - 1].asReadOnlyByteBuffer()); + } + + @Override + public void close() throws IOException { + Files.deleteIfExists(path); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("\npath", path) + .add( + "\nwrites", + Arrays.stream(writes) + .map(b -> String.format("%s \n %s", b.toString(), xxd(false, b.duplicate()))) + .collect(DEBUG_JOINER)) + .add( + "\nexpectedCumulativeContents", + Arrays.stream(expectedCumulativeContents) + .map(ByteString::toString) + .collect(DEBUG_JOINER)) + .toString(); + } + + public static WriteScenario of(ByteBuffer[] byteBuffers) { + try { + Path path = Files.createTempFile(TMP_DIR, WriteScenario.class.getName() + "-", ".bin"); + + List byteStrings = new ArrayList<>(); + for (int i = 0; i < byteBuffers.length; i++) { + ByteString bs = ByteString.empty(); + for (int j = 0; j <= i; j++) { + ByteBuffer byteBuffer = byteBuffers[j].duplicate(); + bs = bs.concat(ByteStringStrategy.noCopy().apply(byteBuffer)); + } + byteStrings.add(bs); + } + + return new WriteScenario(path, byteBuffers, byteStrings.toArray(new ByteString[0])); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java index 7f5c7c7ac7..f40cf27f47 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java @@ -139,6 +139,25 @@ public void xGoogStoredHeadersIncludedIfPresent() throws IOException { assertThat(storageException).hasMessageThat().contains("|< x-goog-stored-something: blah"); } + @Test + public void xGoogGcsIdempotencyTokenHeadersIncludedIfPresent() throws IOException { + HttpRequest req = + new MockHttpTransport() + .createRequestFactory() + .buildPutRequest(new GenericUrl("http://localhost:80980"), new EmptyContent()); + req.getHeaders().setContentLength(0L); + + HttpResponse resp = req.execute(); + resp.getHeaders().set("X-Goog-Gcs-Idempotency-Token", "5").setContentLength(0L); + + StorageException storageException = + JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( + "uploadId", resp, null, () -> null); + + assertThat(storageException.getCode()).isEqualTo(0); + assertThat(storageException).hasMessageThat().contains("|< x-goog-gcs-idempotency-token: 5"); + } + private static final class Cause extends RuntimeException { private Cause() { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java index 48d29bc8c8..79453af559 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java @@ -143,7 +143,7 @@ private static Arbitrary bytes(int minFileSize, int maxFileSize) { } @NonNull - private static Arbitrary byteBuffers(int perBufferMinSize, int perBufferMaxSize) { + static Arbitrary byteBuffers(int perBufferMinSize, int perBufferMaxSize) { return byteBuffer(perBufferMinSize, perBufferMaxSize) .array(ByteBuffer[].class) .ofMinSize(1) @@ -155,7 +155,7 @@ private static Arbitrary byteBuffers(int perBufferMinSize, int per * limit */ @NonNull - private static Arbitrary byteBuffer(int minSize, int maxSize) { + static Arbitrary byteBuffer(int minSize, int maxSize) { return Arbitraries.integers() .between(minSize, maxSize) .withDistribution(RandomDistribution.uniform()) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java index af92080a7d..fb14f7110e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java @@ -229,6 +229,10 @@ public static String xxd(ByteBuffer bytes) { return xxd(true, bytes); } + public static String xxd(ByteString bytes) { + return xxd(false, bytes.asReadOnlyByteBuffer()); + } + public static String xxd(boolean flip, ByteBuffer bytes) { ByteBuffer dup = bytes.duplicate(); if (flip) dup.flip(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java index a7840f45c7..a007400636 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java @@ -22,6 +22,7 @@ import com.google.cloud.storage.ThroughputSink.Record; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; import java.time.Duration; import java.time.Instant; @@ -258,5 +259,10 @@ public void recordThroughput(Record r) {} public WritableByteChannel decorate(WritableByteChannel wbc) { return null; } + + @Override + public GatheringByteChannel decorate(GatheringByteChannel wbc) { + return null; + } } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITGrpcInterceptorTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITGrpcInterceptorTest.java index f7c9d1f153..737d31dd48 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITGrpcInterceptorTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITGrpcInterceptorTest.java @@ -83,8 +83,6 @@ public void grpcStorageOptions_allowSpecifyingInterceptor() throws Exception { .map(m -> m.get(X_GOOG_REQUEST_PARAMS)) .collect(Collectors.toList()); - System.out.println("requestParams = " + requestParams); - String expected = String.format("project=projects/%s", options.getProjectId()); String expectedEncoded = String.format("project=projects%%2F%s", options.getProjectId()); assertThat(requestParams).containsAnyOf(expected, expectedEncoded); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITJournalingBlobWriteSessionConfigTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITJournalingBlobWriteSessionConfigTest.java new file mode 100644 index 0000000000..4526a09f69 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITJournalingBlobWriteSessionConfigTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.BlobWriteSessionConfigs; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.JournalingBlobWriteSessionConfig; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.annotations.StorageFixture; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.PROD) +public final class ITJournalingBlobWriteSessionConfigTest { + private static final int _1MiB = 1024 * 1024; + private static final int _256MiB = 256 * _1MiB; + + @Inject + @StorageFixture(Transport.GRPC) + public Storage storage; + + @Inject public BucketInfo bucket; + @Inject public Generator generator; + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Storage journalingStorage; + private Path tempDir; + + @Before + public void setUp() throws Exception { + tempDir = temporaryFolder.newFolder(generator.randomObjectName()).toPath(); + JournalingBlobWriteSessionConfig journaling = + BlobWriteSessionConfigs.journaling(ImmutableList.of(tempDir)); + journalingStorage = + ((GrpcStorageOptions.Builder) this.storage.getOptions().toBuilder()) + .setBlobWriteSessionConfig(journaling) + .build() + .getService(); + } + + @After + public void tearDown() throws Exception { + if (journalingStorage != null) { + journalingStorage.close(); + } + } + + @Test + public void allBytesProperlyTransmitted() throws Exception { + + Random rand = new Random(bucket.getName().hashCode()); + int objectSize = _256MiB; + byte[] bytes = DataGenerator.rand(rand).genBytes(objectSize); + + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + + BlobWriteSession blobWriteSession = + journalingStorage.blobWriteSession(info, BlobWriteOption.doesNotExist()); + try (WritableByteChannel w = blobWriteSession.open()) { + w.write(ByteBuffer.wrap(bytes)); + } + + BlobInfo resource = blobWriteSession.getResult().get(1, TimeUnit.SECONDS); + + byte[] actual = storage.readAllBytes(info.getBlobId()); + assertAll( + () -> assertThat(resource.getSize()).isEqualTo(objectSize), + () -> assertThat(actual).isEqualTo(bytes)); + } + + @Test + public void journalFileMustNotBeLeftOnDiskAfterSuccess() throws IOException { + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + byte[] bytes = DataGenerator.base64Characters().genBytes(17); + BlobWriteSession blobWriteSession = + journalingStorage.blobWriteSession(info, BlobWriteOption.doesNotExist()); + try (WritableByteChannel w = blobWriteSession.open()) { + w.write(ByteBuffer.wrap(bytes)); + } + + try (Stream stream = Files.list(tempDir)) { + ImmutableList leftOverFiles = stream.collect(ImmutableList.toImmutableList()); + assertThat(leftOverFiles).isEmpty(); + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITUserAgentTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITUserAgentTest.java new file mode 100644 index 0000000000..e408f78081 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITUserAgentTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequest; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.HttpStorageOptions; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.annotations.StorageFixture; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.common.collect.ImmutableList; +import java.util.Objects; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.PROD) +public final class ITUserAgentTest { + + @Inject + @StorageFixture(Transport.HTTP) + public Storage storage; + + @Inject public BucketInfo bucket; + @Inject public Generator generator; + + @Test + public void userAgentIncludesGcloudJava_writer_http() throws Exception { + RequestAuditing requestAuditing = new RequestAuditing(); + HttpStorageOptions options2 = + StorageOptions.http().setTransportOptions(requestAuditing).build(); + try (Storage storage = options2.getService()) { + try (WriteChannel writer = + storage.writer(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build())) { + writer.write(DataGenerator.base64Characters().genByteBuffer(13)); + } + } + + ImmutableList userAgents = + requestAuditing.getRequests().stream() + .map(HttpRequest::getHeaders) + .map(HttpHeaders::getUserAgent) + .filter(Objects::nonNull) + .collect(ImmutableList.toImmutableList()); + + ImmutableList found = + userAgents.stream() + .filter(ua -> ua.contains("gcloud-java/")) + .collect(ImmutableList.toImmutableList()); + assertThat(found).hasSize(2); // one for the create session, and one for the PUT and finalize + } +} diff --git a/grpc-google-cloud-storage-v2/pom.xml b/grpc-google-cloud-storage-v2/pom.xml index b85ea87b07..46a8a9e7e3 100644 --- a/grpc-google-cloud-storage-v2/pom.xml +++ b/grpc-google-cloud-storage-v2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha grpc-google-cloud-storage-v2 GRPC library for grpc-google-cloud-storage-v2 com.google.cloud google-cloud-storage-parent - 2.26.1 + 2.27.0 diff --git a/pom.xml b/pom.xml index ebda713fcf..494484666c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-storage-parent pom - 2.26.1 + 2.27.0 Storage Parent https://github.com/googleapis/java-storage @@ -54,7 +54,7 @@ UTF-8 github google-cloud-storage-parent - 3.14.0 + 3.15.0 @@ -76,17 +76,17 @@ com.google.cloud google-cloud-storage - 2.26.1 + 2.27.0 com.google.apis google-api-services-storage - v1-rev20230617-2.0.0 + v1-rev20230907-2.0.0 com.google.cloud google-cloud-pubsub - 1.124.0 + 1.124.2 test @@ -117,17 +117,17 @@ com.google.api.grpc proto-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha com.google.api.grpc grpc-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha com.google.api.grpc gapic-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha com.google.cloud @@ -199,6 +199,7 @@ proto-google-cloud-storage-v2 gapic-google-cloud-storage-v2 google-cloud-storage-bom + storage-shared-benchmarking diff --git a/proto-google-cloud-storage-v2/pom.xml b/proto-google-cloud-storage-v2/pom.xml index 747b5b96cf..5083d9b9a8 100644 --- a/proto-google-cloud-storage-v2/pom.xml +++ b/proto-google-cloud-storage-v2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-storage-v2 - 2.26.1-alpha + 2.27.0-alpha proto-google-cloud-storage-v2 PROTO library for proto-google-cloud-storage-v2 com.google.cloud google-cloud-storage-parent - 2.26.1 + 2.27.0 diff --git a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/BucketName.java b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/BucketName.java index 078819ae86..d2643c96d8 100644 --- a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/BucketName.java +++ b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/BucketName.java @@ -137,7 +137,7 @@ public boolean equals(java.lang.Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { BucketName that = ((BucketName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.bucket, that.bucket); } diff --git a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/CryptoKeyName.java b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/CryptoKeyName.java index f80912dc89..a9d2ffd79b 100644 --- a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/CryptoKeyName.java +++ b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/CryptoKeyName.java @@ -175,7 +175,7 @@ public boolean equals(java.lang.Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { CryptoKeyName that = ((CryptoKeyName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.location, that.location) diff --git a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/NotificationConfigName.java b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/NotificationConfigName.java index ecb00bec3c..3a0c0a3638 100644 --- a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/NotificationConfigName.java +++ b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/NotificationConfigName.java @@ -159,7 +159,7 @@ public boolean equals(java.lang.Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { NotificationConfigName that = ((NotificationConfigName) o); return Objects.equals(this.project, that.project) && Objects.equals(this.bucket, that.bucket) diff --git a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/ProjectName.java b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/ProjectName.java index 154bd94bf8..b8d7aa7b69 100644 --- a/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/ProjectName.java +++ b/proto-google-cloud-storage-v2/src/main/java/com/google/storage/v2/ProjectName.java @@ -127,7 +127,7 @@ public boolean equals(java.lang.Object o) { if (o == this) { return true; } - if (o != null || getClass() == o.getClass()) { + if (o != null && getClass() == o.getClass()) { ProjectName that = ((ProjectName) o); return Objects.equals(this.project, that.project); } diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index b0c91a15b4..ca55630dce 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -30,7 +30,7 @@ com.google.cloud google-cloud-storage - 2.26.0 + 2.26.1 @@ -61,7 +61,7 @@ com.google.cloud google-cloud-pubsub - 1.124.0 + 1.124.2 test diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml index 10fe7ae26b..4a70fdd3e1 100644 --- a/samples/native-image-sample/pom.xml +++ b/samples/native-image-sample/pom.xml @@ -61,7 +61,7 @@ com.google.cloud google-cloud-pubsub - 1.124.0 + 1.124.2 test @@ -134,7 +134,7 @@ org.graalvm.buildtools native-maven-plugin - 0.9.24 + 0.9.26 true com.example.storage.NativeImageStorageSample diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 3168c082f8..73ab3e84a7 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -28,7 +28,7 @@ com.google.cloud google-cloud-storage - 2.26.1 + 2.27.0 @@ -52,7 +52,7 @@ com.google.cloud google-cloud-pubsub - 1.124.0 + 1.124.2 test diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index f24c2bbac9..10e1453ae9 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -72,7 +72,7 @@ com.google.cloud google-cloud-pubsub - 1.124.0 + 1.124.2 test diff --git a/samples/snippets/src/test/java/com/example/storage/QuickstartSampleIT.java b/samples/snippets/src/test/java/com/example/storage/QuickstartSampleIT.java index 0fb5c0581c..877ec89dae 100644 --- a/samples/snippets/src/test/java/com/example/storage/QuickstartSampleIT.java +++ b/samples/snippets/src/test/java/com/example/storage/QuickstartSampleIT.java @@ -45,7 +45,7 @@ private static void deleteBucket(String bucketName) { @Before public void setUp() { - bucketName = "my-new-bucket-" + UUID.randomUUID(); + bucketName = "java-storage-grpc-" + UUID.randomUUID(); } @After diff --git a/storage-shared-benchmarking/pom.xml b/storage-shared-benchmarking/pom.xml new file mode 100644 index 0000000000..cb5682053a --- /dev/null +++ b/storage-shared-benchmarking/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + com.google.cloud + jar + storage-shared-benchmarking + 0.0.1-SNAPSHOT + + com.google.cloud + google-cloud-storage-parent + 2.27.0 + + + + 1.8 + 1.8 + UTF-8 + + + + info.picocli + picocli + 4.7.5 + + + com.google.cloud + google-cloud-storage + + + com.google.cloud + google-cloud-storage + 2.27.0 + tests + + + com.google.api + gax + + + com.google.api + api-common + + + com.google.guava + guava + + + com.google.cloud + google-cloud-core + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + ${uberjar.name} + + + com.google.cloud.storage.benchmarking.StorageSharedBenchmarkingCli + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + true + + + + + + \ No newline at end of file diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/CloudMonitoringResult.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/CloudMonitoringResult.java new file mode 100644 index 0000000000..36bbe09983 --- /dev/null +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/CloudMonitoringResult.java @@ -0,0 +1,308 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.benchmarking; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import java.util.Objects; +import org.checkerframework.checker.nullness.qual.NonNull; + +final class CloudMonitoringResult { + @NonNull private final String library; + @NonNull private final String api; + @NonNull private final String op; + + private final int workers; + private final int objectSize; + private final int appBufferSize; + private final int chunksize; + private final boolean crc32CEnabled; + private final boolean md5Enabled; + private final int cpuTimeUs; + @NonNull private final String bucketName; + @NonNull private final String status; + @NonNull private final String transferSize; + @NonNull private final String transferOffset; + @NonNull private final String failureMsg; + private final double throughput; + + CloudMonitoringResult( + String library, + String api, + String op, + int workers, + int objectSize, + int appBufferSize, + int chunksize, + boolean crc32cEnabled, + boolean md5Enabled, + int cpuTimeUs, + String bucketName, + String status, + String transferSize, + String transferOffset, + String failureMsg, + double throughput) { + this.library = library; + this.api = api; + this.op = op; + this.workers = workers; + this.objectSize = objectSize; + this.appBufferSize = appBufferSize; + this.chunksize = chunksize; + this.crc32CEnabled = crc32cEnabled; + this.md5Enabled = md5Enabled; + this.cpuTimeUs = cpuTimeUs; + this.bucketName = bucketName; + this.status = status; + this.transferSize = transferSize; + this.transferOffset = transferOffset; + this.failureMsg = failureMsg; + this.throughput = throughput; + } + + public static Builder newBuilder() { + return new Builder(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("library", library) + .add("api", api) + .add("op", op) + .add("workers", workers) + .add("objectSize", objectSize) + .add("appBufferSize", appBufferSize) + .add("chunksize", chunksize) + .add("crc32CEnabled", crc32CEnabled) + .add("md5Enabled", md5Enabled) + .add("cpuTimeUs", cpuTimeUs) + .add("bucketName", bucketName) + .add("status", status) + .add("transferSize", transferSize) + .add("transferOffset", transferOffset) + .add("failureMsg", failureMsg) + .add("throughput", throughput) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CloudMonitoringResult)) { + return false; + } + CloudMonitoringResult result = (CloudMonitoringResult) o; + return workers == result.workers + && objectSize == result.objectSize + && appBufferSize == result.appBufferSize + && chunksize == result.chunksize + && crc32CEnabled == result.crc32CEnabled + && md5Enabled == result.md5Enabled + && cpuTimeUs == result.cpuTimeUs + && Double.compare(result.throughput, throughput) == 0 + && Objects.equals(library, result.library) + && Objects.equals(api, result.api) + && Objects.equals(op, result.op) + && Objects.equals(bucketName, result.bucketName) + && Objects.equals(status, result.status) + && Objects.equals(transferSize, result.transferSize) + && Objects.equals(transferOffset, result.transferOffset) + && Objects.equals(failureMsg, result.failureMsg); + } + + @Override + public int hashCode() { + return Objects.hash( + library, + api, + op, + workers, + objectSize, + appBufferSize, + chunksize, + crc32CEnabled, + md5Enabled, + cpuTimeUs, + bucketName, + status, + transferSize, + transferOffset, + failureMsg, + throughput); + } + + public String formatAsCustomMetric() { + return String.format( + "throughput{library=%s,api=%s,op=%s,object_size=%d,chunksize=%d,workers=%d,crc32c_enabled=%b,md5_enabled=%b,bucket_name=%s,status=%s,app_buffer_size=%d}%.1f", + library, + api, + op, + objectSize, + chunksize, + workers, + crc32CEnabled, + md5Enabled, + bucketName, + status, + appBufferSize, + throughput); + } + + public static class Builder { + + @NonNull private String library; + @NonNull private String api; + @NonNull private String op; + private int workers; + private int objectSize; + private int appBufferSize; + private int chunksize; + private boolean crc32cEnabled; + private boolean md5Enabled; + private int cpuTimeUs; + @NonNull private String bucketName; + @NonNull private String status; + @NonNull private String transferSize; + @NonNull private String transferOffset; + @NonNull private String failureMsg; + private double throughput; + + Builder() { + library = ""; + api = ""; + op = ""; + bucketName = ""; + status = ""; + transferSize = ""; + transferOffset = ""; + failureMsg = ""; + } + + public Builder setLibrary(String library) { + this.library = library; + return this; + } + + public Builder setApi(String api) { + this.api = api; + return this; + } + + public Builder setOp(String op) { + this.op = op; + return this; + } + + public Builder setWorkers(int workers) { + this.workers = workers; + return this; + } + + public Builder setObjectSize(int objectSize) { + this.objectSize = objectSize; + return this; + } + + public Builder setAppBufferSize(int appBufferSize) { + this.appBufferSize = appBufferSize; + return this; + } + + public Builder setChunksize(int chunksize) { + this.chunksize = chunksize; + return this; + } + + public Builder setCrc32cEnabled(boolean crc32cEnabled) { + this.crc32cEnabled = crc32cEnabled; + return this; + } + + public Builder setMd5Enabled(boolean md5Enabled) { + this.md5Enabled = md5Enabled; + return this; + } + + public Builder setCpuTimeUs(int cpuTimeUs) { + this.cpuTimeUs = cpuTimeUs; + return this; + } + + public Builder setBucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder setStatus(String status) { + this.status = status; + return this; + } + + public Builder setTransferSize(String transferSize) { + this.transferSize = transferSize; + return this; + } + + public Builder setTransferOffset(String transferOffset) { + this.transferOffset = transferOffset; + return this; + } + + public Builder setFailureMsg(String failureMsg) { + this.failureMsg = failureMsg; + return this; + } + + public Builder setThroughput(double throughput) { + this.throughput = throughput; + return this; + } + + public CloudMonitoringResult build() { + checkNotNull(library); + checkNotNull(api); + checkNotNull(op); + checkNotNull(bucketName); + checkNotNull(status); + checkNotNull(transferSize); + checkNotNull(transferOffset); + checkNotNull(failureMsg); + return new CloudMonitoringResult( + library, + api, + op, + workers, + objectSize, + appBufferSize, + chunksize, + crc32cEnabled, + md5Enabled, + cpuTimeUs, + bucketName, + status, + transferSize, + transferOffset, + failureMsg, + throughput); + } + } +} diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java new file mode 100644 index 0000000000..8862b544f8 --- /dev/null +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java @@ -0,0 +1,161 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.benchmarking; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.ListenableFutureToApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiExceptions; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.PrintWriter; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.regex.Pattern; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +@Command(name = "ssb") +public final class StorageSharedBenchmarkingCli implements Runnable { + // TODO: check what input validation is needed for option values. + @Option(names = "-project", description = "GCP Project Identifier", required = true) + String project; + + @Option(names = "-bucket", description = "Name of the bucket to use", required = true) + String bucket; + + @Option(names = "-samples", defaultValue = "8000", description = "Number of samples to report") + int samples; + + @Option( + names = "-workers", + defaultValue = "16", + description = "Number of workers to run in parallel for the workload") + int workers; + + @Option(names = "-api", defaultValue = "JSON", description = "API to use") + String api; + + @Option( + names = "-object_size", + defaultValue = "1048576..1048576", + description = + "any positive integer, or an inclusive range such as min..max where min and max are positive integers") + String objectSize; + + @Option( + names = "-output_type", + defaultValue = "cloud-monitoring", + description = "Output results format") + String outputType; + + @Option( + names = "-test_type", + description = "Specify which workload the cli should run", + required = true) + String testType; + + @Option( + names = "-temp_dir_location", + description = "Specify the path where the temporary directory should be located") + String tempDirLocation; + + public static void main(String[] args) { + CommandLine cmd = new CommandLine(StorageSharedBenchmarkingCli.class); + System.exit(cmd.execute(args)); + } + + @Override + public void run() { + switch (testType) { + case "w1r3": + runWorkload1(); + break; + default: + throw new IllegalStateException("Specify a workload to run"); + } + } + + private void runWorkload1() { + RetrySettings retrySettings = StorageOptions.getDefaultRetrySettings().toBuilder().build(); + + StorageOptions retryStorageOptions = + StorageOptions.newBuilder().setProjectId(project).setRetrySettings(retrySettings).build(); + Storage storageClient = retryStorageOptions.getService(); + Path tempDir = + tempDirLocation != null + ? Paths.get(tempDirLocation) + : Paths.get(System.getProperty("java.io.tmpdir")); + ListeningExecutorService executorService = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workers)); + List> workloadRuns = new ArrayList<>(); + Range objectSizeRange = Range.of(objectSize); + for (int i = 0; i < samples; i++) { + int objectSize = getRandomInt(objectSizeRange.min, objectSizeRange.max); + PrintWriter pw = new PrintWriter(System.out, true); + workloadRuns.add( + convert( + executorService.submit( + new W1R3(storageClient, workers, api, pw, objectSize, tempDir, bucket)))); + } + ApiExceptions.callAndTranslateApiException(ApiFutures.allAsList(workloadRuns)); + } + + public static int getRandomInt(int min, int max) { + if (min == max) return min; + Random random = new Random(); + return random.nextInt((max - min) + 1) + min; + } + + private static ApiFuture convert(ListenableFuture lf) { + return new ListenableFutureToApiFuture<>(lf); + } + + private static final class Range { + private final int min; + private final int max; + + private Range(int min, int max) { + this.min = min; + this.max = max; + } + + public static Range of(int min, int max) { + return new Range(min, max); + } + // Takes an object size range of format min..max and creates a range object + public static Range of(String range) { + Pattern p = Pattern.compile("\\.\\."); + String[] splitRangeVals = p.split(range); + if (splitRangeVals.length == 2) { + String min = splitRangeVals[0]; + String max = splitRangeVals[1]; + return of(Integer.parseInt(min), Integer.parseInt(max)); + } + throw new IllegalStateException("Expected a size range of format min..max, but got " + range); + } + } +} diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java new file mode 100644 index 0000000000..0407cf7015 --- /dev/null +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage.benchmarking; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import java.time.Duration; + +class StorageSharedBenchmarkingUtils { + public static long SSB_SIZE_THRESHOLD_BYTES = 1048576; + public static int DEFAULT_NUMBER_OF_READS = 3; + + public static void cleanupObject(Storage storage, Blob created) { + storage.delete( + created.getBlobId(), Storage.BlobSourceOption.generationMatch(created.getGeneration())); + } + + public static double calculateThroughput(long size, Duration elapsedTime) { + return size >= StorageSharedBenchmarkingUtils.SSB_SIZE_THRESHOLD_BYTES + ? size / 1024 / 1024 / (elapsedTime.toNanos()) + : size / 1024 / (elapsedTime.toNanos()); + } +} diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java new file mode 100644 index 0000000000..23714b14b8 --- /dev/null +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java @@ -0,0 +1,115 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.benchmarking; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.TmpFile; +import java.io.PrintWriter; +import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Callable; + +final class W1R3 implements Callable { + + private final Storage storage; + private final int workers; + private final String api; + private final PrintWriter printWriter; + private final int objectSize; + private final Path tempDirectory; + private final String bucketName; + + W1R3( + Storage storage, + int workers, + String api, + PrintWriter printWriter, + int objectSize, + Path tempDirectory, + String bucketName) { + this.storage = storage; + this.workers = workers; + this.api = api; + this.printWriter = printWriter; + this.objectSize = objectSize; + this.tempDirectory = tempDirectory; + this.bucketName = bucketName; + } + + @Override + public String call() throws Exception { + // Create the file to be uploaded and fill it with data + TmpFile file = DataGenerator.base64Characters().tempFile(tempDirectory, objectSize); + BlobInfo blob = BlobInfo.newBuilder(bucketName, file.toString()).build(); + + // Get the start time + Clock clock = Clock.systemDefaultZone(); + Instant startTime = clock.instant(); + Blob created = storage.createFrom(blob, file.getPath()); + Instant endTime = clock.instant(); + Duration elapsedTimeUpload = Duration.between(startTime, endTime); + printWriter.println( + generateCloudMonitoringResult( + "WRITE", + StorageSharedBenchmarkingUtils.calculateThroughput( + created.getSize().longValue(), elapsedTimeUpload), + created) + .formatAsCustomMetric()); + for (int i = 0; i <= StorageSharedBenchmarkingUtils.DEFAULT_NUMBER_OF_READS; i++) { + TmpFile dest = TmpFile.of(tempDirectory, "prefix", "bin"); + startTime = clock.instant(); + storage.downloadTo(created.getBlobId(), dest.getPath()); + endTime = clock.instant(); + Duration elapsedTimeDownload = Duration.between(startTime, endTime); + printWriter.println( + generateCloudMonitoringResult( + "READ[" + i + "]", + StorageSharedBenchmarkingUtils.calculateThroughput( + created.getSize().longValue(), elapsedTimeDownload), + created) + .formatAsCustomMetric()); + } + StorageSharedBenchmarkingUtils.cleanupObject(storage, created); + return "OK"; + } + + private CloudMonitoringResult generateCloudMonitoringResult( + String op, double throughput, Blob created) { + CloudMonitoringResult result = + CloudMonitoringResult.newBuilder() + .setLibrary("java") + .setApi(api) + .setOp(op) + .setWorkers(workers) + .setObjectSize(created.getSize().intValue()) + .setChunksize(created.getSize().intValue()) + .setCrc32cEnabled(false) + .setMd5Enabled(false) + .setCpuTimeUs(-1) + .setBucketName(created.getBucket()) + .setStatus("OK") + .setTransferSize(created.getSize().toString()) + .setThroughput(throughput) + .build(); + return result; + } +} diff --git a/versions.txt b/versions.txt index 17d3d37bb8..2c0ded78b3 100644 --- a/versions.txt +++ b/versions.txt @@ -1,7 +1,7 @@ # Format: # module:released-version:current-version -google-cloud-storage:2.26.1:2.26.1 -gapic-google-cloud-storage-v2:2.26.1-alpha:2.26.1-alpha -grpc-google-cloud-storage-v2:2.26.1-alpha:2.26.1-alpha -proto-google-cloud-storage-v2:2.26.1-alpha:2.26.1-alpha +google-cloud-storage:2.27.0:2.27.0 +gapic-google-cloud-storage-v2:2.27.0-alpha:2.27.0-alpha +grpc-google-cloud-storage-v2:2.27.0-alpha:2.27.0-alpha +proto-google-cloud-storage-v2:2.27.0-alpha:2.27.0-alpha