Skip to content
109 changes: 96 additions & 13 deletions api/src/main/java/io/minio/BaseS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -91,7 +93,6 @@ public abstract class BaseS3Client implements AutoCloseable {
"ServerSideEncryptionConfigurationNotFoundError";
// maximum allowed bucket policy size is 20KiB
protected static final int MAX_BUCKET_POLICY_SIZE = 20 * 1024;
protected static final Random RANDOM = new Random(new SecureRandom().nextLong());
protected static final ObjectMapper OBJECT_MAPPER =
JsonMapper.builder()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
Expand All @@ -103,6 +104,15 @@ public abstract class BaseS3Client implements AutoCloseable {
private static final String UPLOAD_ID = "uploadId";
private static final Set<String> TRACE_QUERY_PARAMS =
ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes");

private static final ScheduledExecutorService RETRY_SCHEDULER =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t = new Thread(r, "minio-retry-scheduler");
t.setDaemon(true);
return t;
});
Comment thread
allanrogerr marked this conversation as resolved.

private PrintWriter traceStream;
protected final Map<String, String> regionCache = new ConcurrentHashMap<>();
protected String userAgent = Utils.getDefaultUserAgent();
Expand All @@ -111,6 +121,7 @@ public abstract class BaseS3Client implements AutoCloseable {
protected Provider provider;
protected OkHttpClient httpClient;
protected boolean closeHttpClient;
protected volatile int maxRetries = Retry.MAX_RETRY;

protected BaseS3Client(
Http.BaseUrl baseUrl, Provider provider, OkHttpClient httpClient, boolean closeHttpClient) {
Expand All @@ -125,6 +136,7 @@ protected BaseS3Client(BaseS3Client client) {
this.provider = client.provider;
this.httpClient = client.httpClient;
this.closeHttpClient = client.closeHttpClient;
this.maxRetries = client.maxRetries;
}

/** Closes underneath HTTP client. */
Expand All @@ -136,6 +148,18 @@ public void close() {
}
}

/**
* Sets the maximum number of retry attempts for failed S3 requests. Requests with non-seekable
* bodies are never retried regardless of this value. The default is {@code Retry.MAX_RETRY} (10).
* Pass 1 to disable automatic retries.
*
* @param maxRetries maximum attempts (must be >= 1).
*/
public void setMaxRetries(int maxRetries) {
if (maxRetries < 1) throw new IllegalArgumentException("maxRetries must be >= 1");
this.maxRetries = maxRetries;
}

/**
* Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values
* must be between 1 and Integer.MAX_VALUE when converted to milliseconds.
Expand Down Expand Up @@ -270,8 +294,54 @@ private String[] handleRedirectResponse(
return new String[] {code, message};
}

/** Execute HTTP request asynchronously for given parameters. */
/** Execute HTTP request asynchronously for given parameters, with automatic retry. */
protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) {
// Non-seekable bodies (raw okhttp3 RequestBody) cannot be replayed — single attempt only.
Http.Body body = s3request.body();
int maxAttempts = (body != null && body.isHttpRequestBody()) ? 1 : this.maxRetries;
Comment thread
allanrogerr marked this conversation as resolved.
return executeWithRetry(s3request, region, maxAttempts, 0);
}

private CompletableFuture<Response> executeWithRetry(
Http.S3Request s3request, String region, int maxAttempts, int attempt) {
return doExecuteAsync(s3request, region)
.handle(
(response, throwable) -> {
if (throwable == null) {
return CompletableFuture.completedFuture(response);
}
Throwable cause =
(throwable instanceof CompletionException) ? throwable.getCause() : throwable;
if (cause == null) cause = throwable;
if (attempt + 1 >= maxAttempts || !Retry.isRetryable(cause)) {
return Utils.<Response>failedFuture(cause);
}
long delayMs = Retry.computeBackoffMs(attempt + 1, ThreadLocalRandom.current());
CompletableFuture<Response> retryFuture = new CompletableFuture<>();
RETRY_SCHEDULER.schedule(
Comment thread
allanrogerr marked this conversation as resolved.
() ->
CompletableFuture.runAsync(
() ->
executeWithRetry(s3request, region, maxAttempts, attempt + 1)
.whenComplete(
(r, t) -> {
if (t != null) retryFuture.completeExceptionally(t);
else retryFuture.complete(r);
}))
.exceptionally(
ex -> {
retryFuture.completeExceptionally(ex);
return null;
}),
delayMs,
TimeUnit.MILLISECONDS);
Comment thread
allanrogerr marked this conversation as resolved.
return retryFuture;
})
.thenCompose(cf -> cf);
}

/** Execute single HTTP request attempt asynchronously for given parameters. */
private CompletableFuture<Response> doExecuteAsync(Http.S3Request s3request, String region) {
Credentials credentials = (provider == null) ? null : provider.fetch();
Http.Request request = null;
try {
Expand All @@ -284,12 +354,7 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str
PrintWriter traceStream = this.traceStream;
if (traceStream != null) traceStream.print(request.httpTraces());

OkHttpClient httpClient = this.httpClient;
// FIXME: enable retry for all request.
// if (!s3request.retryFailure()) {
// httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build();
// }

OkHttpClient httpClient = this.httpClient.newBuilder().retryOnConnectionFailure(false).build();
okhttp3.Request httpRequest = request.httpRequest();
CompletableFuture<Response> completableFuture = newCompleteableFuture();
httpClient
Expand Down Expand Up @@ -471,9 +536,10 @@ private void onResponse(final Response response) throws IOException {
response.header("x-amz-id-2"));
}

// invalidate region cache if needed
if (errorResponse.code().equals(NO_SUCH_BUCKET)
|| errorResponse.code().equals(RETRY_HEAD)) {
// invalidate region cache if needed (bucket may be null for e.g. listBuckets)
if (s3request.bucket() != null
&& (errorResponse.code().equals(NO_SUCH_BUCKET)
|| errorResponse.code().equals(RETRY_HEAD))) {
regionCache.remove(s3request.bucket());
}

Expand Down Expand Up @@ -1225,6 +1291,15 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType)
boolean checksumHeader = headers.namePrefixAny("x-amz-checksum-");
String md5Hash = headers.getFirst(Http.Headers.CONTENT_MD5);

long fileStartPos = 0;
if (args.file() != null) {
try {
fileStartPos = args.file().getFilePointer();
} catch (IOException e) {
throw new MinioException(e);
}
}

if (sha256HexString == null && sha256Base64String == null) {
if (!baseUrl.isHttps()) {
Checksum.Hasher hasher = Checksum.Algorithm.SHA256.hasher();
Expand Down Expand Up @@ -1280,6 +1355,14 @@ private Object[] createBody(PutObjectAPIBaseArgs args, MediaType contentType)
}
}

if (args.file() != null) {
try {
args.file().seek(fileStartPos);
} catch (IOException e) {
throw new MinioException(e);
}
}

Http.Body body = null;
if (args.file() != null) {
body = new Http.Body(args.file(), args.length(), contentType, sha256HexString, md5Hash);
Expand Down
20 changes: 19 additions & 1 deletion api/src/main/java/io/minio/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ public static OkHttpClient setTimeout(
public static class Body {
private okhttp3.RequestBody requestBody;
private RandomAccessFile file;
private long fileOffset;
private ByteBuffer buffer;
private byte[] data;
private Long length;
Expand All @@ -713,6 +714,11 @@ public Body(
String md5Hash) {
if (length < 0) throw new IllegalArgumentException("valid length must be provided");
this.file = file;
try {
this.fileOffset = file.getFilePointer();
} catch (IOException e) {
throw new IllegalStateException("failed to read file position", e);
Comment thread
allanrogerr marked this conversation as resolved.
}
set(length, contentType, sha256Hash, md5Hash);
}

Expand Down Expand Up @@ -786,7 +792,14 @@ public Headers headers() {
/** Creates HTTP RequestBody for this body. */
public RequestBody toRequestBody() throws MinioException {
if (requestBody != null) return new RequestBody(requestBody);
if (file != null) return new RequestBody(file, length, contentType);
if (file != null) {
try {
file.seek(fileOffset);
} catch (IOException e) {
throw new MinioException(e);
}
return new RequestBody(file, length, contentType);
}
if (buffer != null) return new RequestBody(buffer, contentType);
return new RequestBody(data, length.intValue(), contentType);
}
Expand Down Expand Up @@ -1503,6 +1516,11 @@ public String object() {
return object;
}

/** Returns the request body, or {@code null} if none was set. */
public Body body() {
return body;
}

private Request toRequest(
BaseUrl baseUrl, String region, Credentials credentials, Integer expiry)
throws MinioException {
Expand Down
19 changes: 17 additions & 2 deletions api/src/main/java/io/minio/MinioAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
Expand Down Expand Up @@ -156,6 +157,7 @@ public static final class Builder {
private Provider provider;
private OkHttpClient httpClient;
private boolean closeHttpClient;
private int maxRetries = Retry.MAX_RETRY;

public Builder baseUrl(Http.BaseUrl baseUrl) {
if (baseUrl.region() == null) {
Expand Down Expand Up @@ -217,6 +219,16 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) {
return this;
}

/**
* Sets the maximum number of retry attempts per request. Pass 1 to disable automatic retries.
* Defaults to {@code 10}.
*/
public Builder maxRetries(int maxRetries) {
if (maxRetries < 1) throw new IllegalArgumentException("maxRetries must be >= 1");
this.maxRetries = maxRetries;
return this;
}
Comment thread
allanrogerr marked this conversation as resolved.

public MinioAsyncClient build() {
Utils.validateNotNull(baseUrl, "endpoint");

Expand All @@ -232,7 +244,10 @@ public MinioAsyncClient build() {
httpClient = Http.newDefaultClient();
}

return new MinioAsyncClient(baseUrl, provider, httpClient, closeHttpClient);
MinioAsyncClient client =
new MinioAsyncClient(baseUrl, provider, httpClient, closeHttpClient);
client.maxRetries = maxRetries;
return client;
}
}

Expand Down Expand Up @@ -3356,7 +3371,7 @@ public CompletableFuture<PutObjectFanOutResponse> putObjectFanOut(PutObjectFanOu
// Build POST object data
String objectName =
"fan-out-"
+ new BigInteger(32, RANDOM).toString(32)
+ new BigInteger(32, ThreadLocalRandom.current()).toString(32)
+ "-"
+ System.currentTimeMillis();
PostPolicy policy =
Expand Down
18 changes: 18 additions & 0 deletions api/src/main/java/io/minio/MinioClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,15 @@ public void setAwsS3Prefix(String awsS3Prefix) {
asyncClient.setAwsS3Prefix(awsS3Prefix);
}

/**
* Sets the maximum number of retry attempts. Pass 1 to disable automatic retries.
*
* @param maxRetries maximum attempts (must be >= 1).
*/
public void setMaxRetries(int maxRetries) {
asyncClient.setMaxRetries(maxRetries);
}

/** Closes underneath async client. */
@Override
public void close() throws Exception {
Expand Down Expand Up @@ -2043,6 +2052,15 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) {
return this;
}

/**
* Sets the maximum number of retry attempts per request. Pass 1 to disable automatic retries.
* Defaults to {@code 10}.
*/
public Builder maxRetries(int maxRetries) {
asyncClientBuilder.maxRetries(maxRetries);
return this;
}
Comment thread
allanrogerr marked this conversation as resolved.

public MinioClient build() {
MinioAsyncClient asyncClient = asyncClientBuilder.build();
return new MinioClient(asyncClient);
Expand Down
Loading
Loading