6262import java .util .concurrent .CompletableFuture ;
6363import java .util .concurrent .CompletionException ;
6464import java .util .concurrent .ConcurrentHashMap ;
65+ import java .util .concurrent .Executors ;
66+ import java .util .concurrent .ScheduledExecutorService ;
67+ import java .util .concurrent .TimeUnit ;
6568import java .util .function .Supplier ;
6669import java .util .logging .Logger ;
6770import javax .annotation .Nonnull ;
@@ -103,6 +106,15 @@ public abstract class BaseS3Client implements AutoCloseable {
103106 private static final String UPLOAD_ID = "uploadId" ;
104107 private static final Set <String > TRACE_QUERY_PARAMS =
105108 ImmutableSet .of ("retention" , "legal-hold" , "tagging" , UPLOAD_ID , "acl" , "attributes" );
109+
110+ private static final ScheduledExecutorService RETRY_SCHEDULER =
111+ Executors .newSingleThreadScheduledExecutor (
112+ r -> {
113+ Thread t = new Thread (r , "minio-retry-scheduler" );
114+ t .setDaemon (true );
115+ return t ;
116+ });
117+
106118 private PrintWriter traceStream ;
107119 protected final Map <String , String > regionCache = new ConcurrentHashMap <>();
108120 protected String userAgent = Utils .getDefaultUserAgent ();
@@ -111,6 +123,7 @@ public abstract class BaseS3Client implements AutoCloseable {
111123 protected Provider provider ;
112124 protected OkHttpClient httpClient ;
113125 protected boolean closeHttpClient ;
126+ protected volatile int maxRetries = Retry .MAX_RETRY ;
114127
115128 protected BaseS3Client (
116129 Http .BaseUrl baseUrl , Provider provider , OkHttpClient httpClient , boolean closeHttpClient ) {
@@ -125,6 +138,7 @@ protected BaseS3Client(BaseS3Client client) {
125138 this .provider = client .provider ;
126139 this .httpClient = client .httpClient ;
127140 this .closeHttpClient = client .closeHttpClient ;
141+ this .maxRetries = client .maxRetries ;
128142 }
129143
130144 /** Closes underneath HTTP client. */
@@ -136,6 +150,18 @@ public void close() {
136150 }
137151 }
138152
153+ /**
154+ * Sets the maximum number of retry attempts for failed S3 requests. Requests with non-seekable
155+ * bodies are never retried regardless of this value. The default is {@code Retry.MAX_RETRY} (10).
156+ * Pass 1 to disable automatic retries.
157+ *
158+ * @param maxRetries maximum attempts (must be >= 1).
159+ */
160+ public void setMaxRetries (int maxRetries ) {
161+ if (maxRetries < 1 ) throw new IllegalArgumentException ("maxRetries must be >= 1" );
162+ this .maxRetries = maxRetries ;
163+ }
164+
139165 /**
140166 * Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values
141167 * must be between 1 and Integer.MAX_VALUE when converted to milliseconds.
@@ -270,8 +296,47 @@ private String[] handleRedirectResponse(
270296 return new String [] {code , message };
271297 }
272298
273- /** Execute HTTP request asynchronously for given parameters. */
299+ /** Execute HTTP request asynchronously for given parameters, with automatic retry . */
274300 protected CompletableFuture <Response > executeAsync (Http .S3Request s3request , String region ) {
301+ // Non-seekable bodies (raw okhttp3 RequestBody) cannot be replayed — single attempt only.
302+ Http .Body body = s3request .body ();
303+ int maxAttempts = (body != null && body .isHttpRequestBody ()) ? 1 : this .maxRetries ;
304+ return executeWithRetry (s3request , region , maxAttempts , 0 );
305+ }
306+
307+ private CompletableFuture <Response > executeWithRetry (
308+ Http .S3Request s3request , String region , int maxAttempts , int attempt ) {
309+ return doExecuteAsync (s3request , region )
310+ .handle (
311+ (response , throwable ) -> {
312+ if (throwable == null ) {
313+ return CompletableFuture .completedFuture (response );
314+ }
315+ Throwable cause =
316+ (throwable instanceof CompletionException ) ? throwable .getCause () : throwable ;
317+ if (cause == null ) cause = throwable ;
318+ if (attempt + 1 >= maxAttempts || !Retry .isRetryable (cause )) {
319+ return Utils .<Response >failedFuture (cause );
320+ }
321+ long delayMs = Retry .computeBackoffMs (attempt + 1 , RANDOM );
322+ CompletableFuture <Response > retryFuture = new CompletableFuture <>();
323+ RETRY_SCHEDULER .schedule (
324+ () ->
325+ executeWithRetry (s3request , region , maxAttempts , attempt + 1 )
326+ .whenComplete (
327+ (r , t ) -> {
328+ if (t != null ) retryFuture .completeExceptionally (t );
329+ else retryFuture .complete (r );
330+ }),
331+ delayMs ,
332+ TimeUnit .MILLISECONDS );
333+ return retryFuture ;
334+ })
335+ .thenCompose (cf -> cf );
336+ }
337+
338+ /** Execute single HTTP request attempt asynchronously for given parameters. */
339+ private CompletableFuture <Response > doExecuteAsync (Http .S3Request s3request , String region ) {
275340 Credentials credentials = (provider == null ) ? null : provider .fetch ();
276341 Http .Request request = null ;
277342 try {
@@ -285,11 +350,6 @@ protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, Str
285350 if (traceStream != null ) traceStream .print (request .httpTraces ());
286351
287352 OkHttpClient httpClient = this .httpClient ;
288- // FIXME: enable retry for all request.
289- // if (!s3request.retryFailure()) {
290- // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build();
291- // }
292-
293353 okhttp3 .Request httpRequest = request .httpRequest ();
294354 CompletableFuture <Response > completableFuture = newCompleteableFuture ();
295355 httpClient
0 commit comments