diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 20374358317..875313bf3e3 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -155,22 +156,26 @@ public void send( RequestBody requestBody = new GrpcRequestBody(messageWriter, compressor); requestBuilder.post(requestBody); - InstrumentationUtil.suppressInstrumentation( - () -> - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - onError.accept(e); - } - - @Override - public void onResponse(Call call, Response response) { - handleResponse(response, onResponse); - } - })); + try { + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + handleResponse(response, onResponse); + } + })); + } catch (RejectedExecutionException e) { + onError.accept(e); + } } private void handleResponse(Response response, Consumer onResponse) { diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index ad35eae4a60..b9f59c7b9c2 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; @@ -138,22 +139,26 @@ public void send( requestBuilder.addHeader("Accept-Encoding", "gzip, identity"); requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType)); - InstrumentationUtil.suppressInstrumentation( - () -> - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - onError.accept(e); - } - - @Override - public void onResponse(Call call, Response response) { - handleResponse(response, onResponse, onError); - } - })); + try { + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, Response response) { + handleResponse(response, onResponse, onError); + } + })); + } catch (RejectedExecutionException e) { + onError.accept(e); + } } private void handleResponse( diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index 4448c717e15..93909bd3255 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -20,6 +20,8 @@ * at any time. */ public final class OkHttpUtil { + private static final int DEFAULT_MAX_REQUESTS_PER_HOST = 5; + @SuppressWarnings("NonFinalStaticField") private static boolean propagateContextForTestingInDispatcher = false; @@ -30,14 +32,19 @@ public static void setPropagateContextForTestingInDispatcher( /** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */ public static Dispatcher newDispatcher() { - return new Dispatcher( - new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - 60, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - createThreadFactory("okhttp-dispatch"))); + int maxRequests = Math.max(Runtime.getRuntime().availableProcessors(), 5); + Dispatcher dispatcher = + new Dispatcher( + new ThreadPoolExecutor( + 0, + maxRequests, + 60, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + createThreadFactory("okhttp-dispatch"))); + dispatcher.setMaxRequests(maxRequests); + dispatcher.setMaxRequestsPerHost(DEFAULT_MAX_REQUESTS_PER_HOST); + return dispatcher; } private static DaemonThreadFactory createThreadFactory(String namePrefix) { diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java index a7f314c93b5..d1172f2209a 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java @@ -5,11 +5,13 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.GrpcResponse; import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; import java.io.IOException; @@ -21,7 +23,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import okhttp3.MediaType; import okhttp3.Protocol; import okhttp3.Request; @@ -57,6 +62,34 @@ void isRetryable_NonRetryableGrpcStatus() { assertFalse(isRetryable); } + @Test + void send_rejectedExecution_callsOnError() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); + executor.shutdown(); + + OkHttpGrpcSender sender = + new OkHttpGrpcSender( + "http://localhost", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + executor, + Long.MAX_VALUE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + sender.send(new TestMessageWriter(), responseRef::set, errorRef::set); + + assertThat(errorRef.get()).isNotNull(); + assertThat(responseRef.get()).isNull(); + } + private static Response createResponse(int httpCode, String grpcStatus, String message) { return new Response.Builder() .request(new Request.Builder().url("http://localhost/").build()) diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java new file mode 100644 index 00000000000..a74695df9af --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.sdk.common.export.HttpResponse; +import io.opentelemetry.sdk.common.export.MessageWriter; +import java.io.OutputStream; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class OkHttpHttpSenderTest { + + @Test + void send_rejectedExecution_callsOnError() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); + executor.shutdown(); + + OkHttpHttpSender sender = + new OkHttpHttpSender( + URI.create("http://localhost"), + "text/plain", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null, + executor, + Long.MAX_VALUE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + sender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set); + + assertThat(errorRef.get()).isNotNull(); + assertThat(responseRef.get()).isNull(); + } + + private static class NoOpRequestBodyWriter implements MessageWriter { + @Override + public void writeMessage(OutputStream output) {} + + @Override + public int getContentLength() { + return 0; + } + } +} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java new file mode 100644 index 00000000000..aeee5899bda --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtilTest.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.type; + +import java.util.concurrent.ThreadPoolExecutor; +import okhttp3.Dispatcher; +import org.junit.jupiter.api.Test; + +class OkHttpUtilTest { + private static final int EXPECTED_MAX_REQUESTS = + Math.max(Runtime.getRuntime().availableProcessors(), 5); + + @Test + void newDispatcher_isBounded() { + Dispatcher dispatcher = OkHttpUtil.newDispatcher(); + + try { + assertThat(dispatcher.getMaxRequests()).isEqualTo(EXPECTED_MAX_REQUESTS); + assertThat(dispatcher.getMaxRequestsPerHost()).isEqualTo(5); + assertThat(dispatcher.executorService()) + .asInstanceOf(type(ThreadPoolExecutor.class)) + .satisfies( + executor -> { + assertThat(executor.getMaximumPoolSize()).isEqualTo(EXPECTED_MAX_REQUESTS); + assertThat(executor.getRejectedExecutionHandler()) + .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class); + }); + } finally { + dispatcher.executorService().shutdownNow(); + } + } +}