Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -155,22 +156,26 @@ public void send(
RequestBody requestBody = new GrpcRequestBody(messageWriter, compressor);
requestBuilder.post(requestBody);

InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse);
}
}));
try {
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse);
}
}));
} catch (RejectedExecutionException e) {
onError.accept(e);
}
}

private void handleResponse(Response response, Consumer<GrpcResponse> onResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -138,22 +139,26 @@ public void send(
requestBuilder.addHeader("Accept-Encoding", "gzip, identity");
requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType));

InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse, onError);
}
}));
try {
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, Response response) {
handleResponse(response, onResponse, onError);
}
}));
} catch (RejectedExecutionException e) {
onError.accept(e);
}
}

private void handleResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
* at any time.
*/
public final class OkHttpUtil {
private static final int DEFAULT_MAX_REQUESTS_PER_HOST = 5;

@SuppressWarnings("NonFinalStaticField")
private static boolean propagateContextForTestingInDispatcher = false;

Expand All @@ -30,14 +32,19 @@ public static void setPropagateContextForTestingInDispatcher(

/** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */
public static Dispatcher newDispatcher() {
return new Dispatcher(
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
createThreadFactory("okhttp-dispatch")));
int maxRequests = Math.max(Runtime.getRuntime().availableProcessors(), 5);
Dispatcher dispatcher =
new Dispatcher(
new ThreadPoolExecutor(
0,
maxRequests,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
createThreadFactory("okhttp-dispatch")));
dispatcher.setMaxRequests(maxRequests);
dispatcher.setMaxRequestsPerHost(DEFAULT_MAX_REQUESTS_PER_HOST);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation here given that these are already the defaults for new Dispatcher? Is the goal to just make them explicit and visible?

return dispatcher;
}

private static DaemonThreadFactory createThreadFactory(String namePrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.GrpcResponse;
import io.opentelemetry.sdk.common.export.GrpcStatusCode;
import io.opentelemetry.sdk.common.export.MessageWriter;
import java.io.IOException;
Expand All @@ -21,7 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.MediaType;
import okhttp3.Protocol;
import okhttp3.Request;
Expand Down Expand Up @@ -57,6 +62,34 @@ void isRetryable_NonRetryableGrpcStatus() {
assertFalse(isRetryable);
}

@Test
void send_rejectedExecution_callsOnError() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.shutdown();

OkHttpGrpcSender sender =
new OkHttpGrpcSender(
"http://localhost",
null,
Duration.ofSeconds(10),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
executor,
Long.MAX_VALUE);

AtomicReference<GrpcResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

sender.send(new TestMessageWriter(), responseRef::set, errorRef::set);

assertThat(errorRef.get()).isNotNull();
assertThat(responseRef.get()).isNull();
}

private static Response createResponse(int httpCode, String grpcStatus, String message) {
return new Response.Builder()
.request(new Request.Builder().url("http://localhost/").build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.sdk.common.export.HttpResponse;
import io.opentelemetry.sdk.common.export.MessageWriter;
import java.io.OutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;

class OkHttpHttpSenderTest {

@Test
void send_rejectedExecution_callsOnError() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.shutdown();

OkHttpHttpSender sender =
new OkHttpHttpSender(
URI.create("http://localhost"),
"text/plain",
null,
Duration.ofSeconds(10),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
null,
executor,
Long.MAX_VALUE);

AtomicReference<HttpResponse> responseRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

sender.send(new NoOpRequestBodyWriter(), responseRef::set, errorRef::set);

assertThat(errorRef.get()).isNotNull();
assertThat(responseRef.get()).isNull();
}

private static class NoOpRequestBodyWriter implements MessageWriter {
@Override
public void writeMessage(OutputStream output) {}

@Override
public int getContentLength() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.type;

import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.Dispatcher;
import org.junit.jupiter.api.Test;

class OkHttpUtilTest {
private static final int EXPECTED_MAX_REQUESTS =
Math.max(Runtime.getRuntime().availableProcessors(), 5);

@Test
void newDispatcher_isBounded() {
Dispatcher dispatcher = OkHttpUtil.newDispatcher();

try {
assertThat(dispatcher.getMaxRequests()).isEqualTo(EXPECTED_MAX_REQUESTS);
assertThat(dispatcher.getMaxRequestsPerHost()).isEqualTo(5);
assertThat(dispatcher.executorService())
.asInstanceOf(type(ThreadPoolExecutor.class))
.satisfies(
executor -> {
assertThat(executor.getMaximumPoolSize()).isEqualTo(EXPECTED_MAX_REQUESTS);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
});
} finally {
dispatcher.executorService().shutdownNow();
}
}
}
Loading