Skip to content

chore(retry): introduce RetryAlgorithmManager #1029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions;
import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -25,17 +24,15 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.ExceptionHandler;
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.Storage.SignUrlOption;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Function;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
Expand All @@ -50,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* An object in Google Cloud Storage. A {@code Blob} object includes the {@code BlobId} instance,
Expand Down Expand Up @@ -78,18 +76,9 @@ public class Blob extends BlobInfo {
private static final long serialVersionUID = -6806832496717441434L;

private final StorageOptions options;
private final RetryAlgorithmManager retryAlgorithmManager;
private transient Storage storage;

static final Function<Tuple<Storage, StorageObject>, Blob> BLOB_FROM_PB_FUNCTION =
new Function<Tuple<Storage, StorageObject>, Blob>() {
@Override
public Blob apply(Tuple<Storage, StorageObject> pb) {
return Blob.fromPb(pb.x(), pb.y());
}
};

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;

/** Class for specifying blob source options when {@code Blob} methods are used. */
public static class BlobSourceOption extends Option {

Expand Down Expand Up @@ -258,26 +247,18 @@ public void downloadTo(Path path, BlobSourceOption... options) {
public void downloadTo(OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageRpc storageRpc = this.options.getStorageRpcV1();
StorageObject pb = getBlobId().toPb();
final Map<StorageRpc.Option, ?> requestOptions = StorageImpl.optionMap(getBlobId(), options);
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
storageRpc.read(
getBlobId().toPb(),
requestOptions,
countingOutputStream.getCount(),
countingOutputStream);
}
}),
this.options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
this.options.getClock());
} catch (RetryHelper.RetryHelperException e) {
StorageException.translateAndThrow(e);
}
ExceptionHandler exceptionHandler = retryAlgorithmManager.getForObjectsGet(pb, requestOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need a separate exception handler getter for each API method? Naively, I would think it would be possible to do something like what python has (providing one retryer for always-idempotent ops and one for each type of conditional retry; see https://googleapis.dev/python/storage/latest/retry_timeout.html#module-google.cloud.storage.retry ). Is it possible to abstract something at a similar level here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to python we don't have the pattern of the retry algorithm provided as a method argument, this approach allows for per rpc method handling. In our default implementation we'll most likely end up with common ExceptionHandlers which can simply be returned when applicable.

Part of this api is also functioning as a bit of a prototype of what we might want the public api to look like. In java-core there is a pattern of being able to configure retry settings per rpc method as there is not necessarily the idempotency buckets available universally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. So do you think we'll be able to get more conciseness in the public API? Or probably not?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, we'll have to weigh it against how gax-java allows configuration of retry settings as I know there is some ability for per rpc config in their public api.

Retrying.run(
this.options,
exceptionHandler,
callable(
() -> {
storageRpc.read(
pb, requestOptions, countingOutputStream.getCount(), countingOutputStream);
}),
Function.identity());
}

/**
Expand Down Expand Up @@ -506,6 +487,7 @@ public Blob build() {
super(infoBuilder);
this.storage = checkNotNull(storage);
this.options = storage.getOptions();
this.retryAlgorithmManager = storage.getOptions().getRetryAlgorithmManager();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.ExceptionHandler;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
Expand All @@ -31,7 +32,6 @@
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;

/** Default implementation for ReadChannel. */
class BlobReadChannel implements ReadChannel {
Expand All @@ -41,6 +41,7 @@ class BlobReadChannel implements ReadChannel {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final RetryAlgorithmManager retryAlgorithmManager;
private String lastEtag;
private long position;
private boolean isOpen;
Expand All @@ -57,6 +58,7 @@ class BlobReadChannel implements ReadChannel {
this.serviceOptions = serviceOptions;
this.blob = blob;
this.requestOptions = requestOptions;
this.retryAlgorithmManager = serviceOptions.getRetryAlgorithmManager();
isOpen = true;
storageRpc = serviceOptions.getStorageRpcV1();
storageObject = blob.toPb();
Expand Down Expand Up @@ -119,24 +121,21 @@ public int read(ByteBuffer byteBuffer) throws IOException {
}
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
try {
ExceptionHandler exceptionHandler =
retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions);
Tuple<String, byte[]> result =
runWithRetries(
new Callable<Tuple<String, byte[]>>() {
@Override
public Tuple<String, byte[]> call() {
return storageRpc.read(storageObject, requestOptions, position, toRead);
}
},
() -> storageRpc.read(storageObject, requestOptions, position, toRead),
serviceOptions.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
exceptionHandler,
serviceOptions.getClock());
if (result.y().length > 0 && lastEtag != null && !Objects.equals(result.x(), lastEtag)) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blob).append(" was updated while reading");
throw new IOException(messageBuilder.toString());
String etag = result.x();
byte[] bytes = result.y();
if (bytes.length > 0 && lastEtag != null && !Objects.equals(etag, lastEtag)) {
throw new IOException("Blob " + blob + " was updated while reading");
}
lastEtag = result.x();
buffer = result.y();
lastEtag = etag;
buffer = bytes;
} catch (RetryHelper.RetryHelperException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,41 @@
package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.ExceptionHandler;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import java.math.BigInteger;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Write channel implementation to upload Google Cloud Storage blobs. */
class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {

BlobWriteChannel(StorageOptions options, BlobInfo blob, Map<StorageRpc.Option, ?> optionsMap) {
this(options, blob, open(options, blob, optionsMap));
}

BlobWriteChannel(StorageOptions options, URL signedURL) {
this(options, open(signedURL, options));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

BlobWriteChannel(StorageOptions options, String uploadId) {
super(options, null, uploadId);
}

// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

private final ExceptionHandler exceptionHandlerForWrite;
// Detect if flushBuffer() is being retried or not.
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;
private boolean checkingForLastChunk = false;

// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

BlobWriteChannel(
StorageOptions storageOptions,
BlobInfo blobInfo,
String uploadId,
ExceptionHandler exceptionHandlerForWrite) {
super(storageOptions, blobInfo, uploadId);
this.exceptionHandlerForWrite = exceptionHandlerForWrite;
}

boolean isRetrying() {
return retrying;
}
Expand Down Expand Up @@ -275,91 +269,81 @@ public void run() {
}
}),
getOptions().getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
exceptionHandlerForWrite,
getOptions().getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

protected StateImpl.Builder stateBuilder() {
return StateImpl.builder(getOptions(), getEntity(), getUploadId());
return StateImpl.builder(getOptions(), getEntity(), getUploadId())
.setResultRetryAlgorithm(exceptionHandlerForWrite);
}

private static String open(
final StorageOptions options,
final BlobInfo blob,
final Map<StorageRpc.Option, ?> optionsMap) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
return options.getStorageRpcV1().open(blob.toPb(), optionsMap);
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
static Builder newBuilder() {
return new Builder();
}

private static String open(final URL signedURL, final StorageOptions options) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
if (!isValidSignedURL(signedURL.getQuery())) {
throw new StorageException(2, "invalid signedURL");
}
return options.getStorageRpcV1().open(signedURL.toString());
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
static final class Builder {
private StorageOptions storageOptions;
private BlobInfo blobInfo;
private Supplier<@NonNull String> uploadIdSupplier;
private ExceptionHandler putExceptionHandler;

public Builder setStorageOptions(StorageOptions storageOptions) {
this.storageOptions = storageOptions;
return this;
}
}

private static boolean isValidSignedURL(String signedURLQuery) {
boolean isValid = true;
if (signedURLQuery.startsWith("X-Goog-Algorithm=")) {
if (!signedURLQuery.contains("&X-Goog-Credential=")
|| !signedURLQuery.contains("&X-Goog-Date=")
|| !signedURLQuery.contains("&X-Goog-Expires=")
|| !signedURLQuery.contains("&X-Goog-SignedHeaders=")
|| !signedURLQuery.contains("&X-Goog-Signature=")) {
isValid = false;
}
} else if (signedURLQuery.startsWith("GoogleAccessId=")) {
if (!signedURLQuery.contains("&Expires=") || !signedURLQuery.contains("&Signature=")) {
isValid = false;
}
} else {
isValid = false;
public Builder setBlobInfo(BlobInfo blobInfo) {
this.blobInfo = blobInfo;
return this;
}

public Builder setUploadIdSupplier(Supplier<String> uploadIdSupplier) {
this.uploadIdSupplier = uploadIdSupplier;
return this;
}

public Builder setPutExceptionHandler(ExceptionHandler putExceptionHandler) {
this.putExceptionHandler = putExceptionHandler;
return this;
}

BlobWriteChannel build() {
String uploadId = requireNonNull(uploadIdSupplier, "uploadId must be non null").get();
return new BlobWriteChannel(
requireNonNull(storageOptions, "storageOptions must be non null"),
blobInfo,
requireNonNull(uploadId, "uploadId must be non null"),
requireNonNull(putExceptionHandler, "putExceptionHandler must be non null"));
}
return isValid;
}

static class StateImpl extends BaseWriteChannel.BaseState<StorageOptions, BlobInfo> {

private static final long serialVersionUID = -9028324143780151286L;

private final ExceptionHandler exceptionHandler;

StateImpl(Builder builder) {
super(builder);
this.exceptionHandler = builder.exceptionHandler;
}

static class Builder extends BaseWriteChannel.BaseState.Builder<StorageOptions, BlobInfo> {
private ExceptionHandler exceptionHandler;

private Builder(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

public Builder setResultRetryAlgorithm(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
return this;
}

@Override
public RestorableState<WriteChannel> build() {
return new StateImpl(this);
Expand All @@ -372,9 +356,19 @@ static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadI

@Override
public WriteChannel restore() {
BlobWriteChannel channel = new BlobWriteChannel(serviceOptions, entity, uploadId);
channel.restore(this);
return channel;
try {
BlobWriteChannel channel =
BlobWriteChannel.newBuilder()
.setStorageOptions(serviceOptions)
.setBlobInfo(entity)
.setUploadIdSupplier(() -> uploadId)
.setPutExceptionHandler(exceptionHandler)
.build();
channel.restore(this);
return channel;
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}
}
}
Loading