Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -39,6 +40,25 @@
@Testcontainers
public class TarantoolCrudClientWithRetryTest {

private static final boolean SHUTDOWN_HOOK_REGISTERED = registerShutdownHook();

private static boolean registerShutdownHook() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
Timer t = timerService;
if (t != null) {
Set<io.netty.util.Timeout> pending = t.stop();
if (pending != null && !pending.isEmpty()) {
pending.forEach(io.netty.util.Timeout::cancel);
}
}
},
"tarantool-crud-client-retry-test-timer-shutdown"));
return true;
}

public class OperationsRepeater<T> {

private final Supplier<CompletableFuture<T>> operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
Expand All @@ -32,6 +34,25 @@

public abstract class BaseTest {

private static final boolean SHUTDOWN_HOOK_REGISTERED = registerShutdownHook();

private static boolean registerShutdownHook() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
Timer t = timerService;
if (t != null) {
Set<Timeout> pending = t.stop();
if (pending != null && !pending.isEmpty()) {
pending.forEach(Timeout::cancel);
}
}
},
"base-test-timer-shutdown"));
return true;
}

protected static final Bootstrap bootstrap =
new Bootstrap()
.group(new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()))
Expand All @@ -58,9 +79,9 @@ public abstract class BaseTest {
}
};

protected Timer timerService = new HashedWheelTimer();
protected static Timer timerService = new HashedWheelTimer();

protected ConnectionFactory factory = new ConnectionFactory(bootstrap, timerService);
protected static ConnectionFactory factory = new ConnectionFactory(bootstrap, timerService);

protected static ArrayValue decodeTuple(IProtoClient client, ArrayValue arrayValue)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import io.tarantool.core.IProtoClient;
import io.tarantool.core.IProtoClientImpl;
import io.tarantool.core.IProtoFeature;
import io.tarantool.core.connection.ConnectionFactory;
import io.tarantool.core.exceptions.BoxError;
import io.tarantool.core.exceptions.BoxErrorStackItem;
import io.tarantool.core.exceptions.ClientException;
Expand Down Expand Up @@ -1515,16 +1516,18 @@ public void testPing() throws Exception {

@Test
public void testTimeoutCancel() throws Exception {
IProtoClient client = createClientAndConnect(address, true);
io.netty.util.Timer localTimer = new HashedWheelTimer();
ConnectionFactory localFactory = new ConnectionFactory(bootstrap, localTimer);
IProtoClient client =
new IProtoClientImpl(localFactory, localTimer, DEFAULT_WATCHER_OPTS, null, null, true);
client.connect(address, 3_000).get();
client.authorize(API_USER, CREDS.get(API_USER)).join();
IProtoMessage message = client.ping().get();
checkMessageHeader(message, IPROTO_OK, 4);
assertEquals(0, message.getBody().map().size());

Set<io.netty.util.Timeout> timers = timerService.stop();
Set<io.netty.util.Timeout> timers = localTimer.stop();
assertTrue(timers.isEmpty());

timerService = new HashedWheelTimer();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private void testWatcherRecoveryAfterReconnectOnContainer(
private void checkTTVersion(TarantoolContainer<?> tt, String version) throws Exception {
List<?> result = TarantoolContainerClientHelper.executeCommandDecoded(tt, "return _TARANTOOL");
String ttVersion = (String) result.get(0);
assertTrue(ttVersion.startsWith(version.split("-")[0]));
String expectedPrefix = version == null ? ttVersion.split("-")[0] : version.split("-")[0];
assertTrue(ttVersion.startsWith(expectedPrefix));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,40 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;

import io.tarantool.core.connection.ConnectionFactory;

public abstract class BaseTest {

private static final boolean SHUTDOWN_HOOK_REGISTERED = registerShutdownHook();

private static boolean registerShutdownHook() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
Timer t = timerService;
if (t != null) {
Set<Timeout> pending = t.stop();
if (pending != null && !pending.isEmpty()) {
pending.forEach(Timeout::cancel);
}
}
},
"jackson-mapping-base-test-timer-shutdown"));
return true;
}

protected static final String API_USER = "api_user";

protected static final Map<String, String> CREDS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,11 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti

@Override
public void forEach(Consumer<IProtoClient> action) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
action.accept(entry.getClient());
synchronized (connectionPoolLock) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
action.accept(entry.getClient());
}
}
}
}
Expand Down
99 changes: 74 additions & 25 deletions tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -143,27 +144,36 @@ final class PoolEntry {
* <p>It will be returned to all out clients wanting to obtain this client. It is recreated only
* if client is reconnected.
*/
private CompletableFuture<IProtoClient> connectFuture;
private volatile CompletableFuture<IProtoClient> connectFuture;

/** Last heartbeat state/event. */
private HeartbeatEvent lastHeartbeatEvent;
private volatile HeartbeatEvent lastHeartbeatEvent;

/** Heartbeat timer/task. */
private Timeout heartbeatTask;
private volatile Timeout heartbeatTask;

/** Reconnection task. */
private Timeout reconnectTask;
private volatile Timeout reconnectTask;

/** Flag signaling if heartbeat started or not. */
private boolean isHeartbeatStarted;
private volatile boolean isHeartbeatStarted;

/**
* Flag signaling if connection is available or not.
*
* <p>When connection comes to invalidated state or killed, pool entry is locked and connection
* will not be returned to outer client.
*/
private boolean isLocked;
private volatile boolean isLocked;

/**
* Idempotency flag for {@link #shutdown()}.
*
* <p>Guarantees that the close listener is invoked only once even if both {@link #close()} and
* {@link #shutdown()} are called, or shutdown is invoked multiple times due to overlapping
* connection error events.
*/
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

/** Count of failed pings occurred in invalidated state. */
private int currentDeathPings;
Expand Down Expand Up @@ -305,7 +315,7 @@ public IProtoClient getClient() {
*
* <p>Also increments count of unavailable clients.
*/
public void lock() {
public synchronized void lock() {
if (!isLocked) {
unavailable.incrementAndGet();
isLocked = true;
Expand All @@ -317,7 +327,7 @@ public void lock() {
*
* <p>Also decrements count of unavailable clients and cancels reconnect task.
*/
public void unlock() {
public synchronized void unlock() {
if (isLocked) {
stopReconnectTask();
unavailable.decrementAndGet();
Expand All @@ -340,16 +350,28 @@ public void close() {
shutdown();
}

/** Closes client and stops heartbeat task is started. */
/**
* Closes the underlying client and stops the heartbeat task.
*
* <p>Performs field mutations under the entry monitor, then releases it before calling {@code
* client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close
* event. Holding the entry monitor across either of those calls would create an ABBA deadlock
* with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and
* then re-enters {@link #handleConnectError(Object, Throwable)} on the entry.
*/
public void shutdown() {
connectFuture = null;
stopHeartbeat();
synchronized (this) {
connectFuture = null;
stopHeartbeat();
}
try {
client.close();
} catch (Exception e) {
log.warn("Cannot close client in pool", e);
}
emit(listener -> listener.onConnectionClosed(tag, index));
if (isShutdown.compareAndSet(false, true)) {
emit(listener -> listener.onConnectionClosed(tag, index));
}
}

/**
Expand Down Expand Up @@ -410,15 +432,27 @@ public void stopHeartbeat() {
/**
* Internal method used by reconnect task and public connect.
*
* <p>Returns the local chain {@code cf} rather than the {@link #connectFuture} field. If the
* underlying {@code client.connect()} fails inline (e.g. connection refused), the {@code
* whenComplete} callback fires reentrantly on the calling thread, which causes {@link
* #handleConnectError(Object, Throwable)} to clear the field for reconnect purposes. The local
* variable still references the (failed) future, so callers always receive a non-null result and
* observe the failure via {@code ExecutionException} from {@code .get()}, while the field being
* null guarantees that the next reconnect attempt actually reconnects instead of returning a
* stale failed future.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
private CompletableFuture<IProtoClient> internalConnect() {
private synchronized CompletableFuture<IProtoClient> internalConnect() {
if (connectFuture != null) {
return connectFuture;
}
log.info("connect {}/{}", tag, index);
LongTaskTimer.Sample timer = startTimer(connectTime);
CompletableFuture<?> future =
client.connect(group.getAddress(), connectTimeout, gracefulShutdown);
String user = group.getUser();
connectFuture =
CompletableFuture<IProtoClient> cf =
future
.thenCompose(
greeting -> {
Expand All @@ -428,9 +462,10 @@ private CompletableFuture<IProtoClient> internalConnect() {
}
return client.ping(firstPingOpts);
})
.thenApply(r -> client)
.whenComplete(this::onConnectComplete);
return connectFuture;
.thenApply(r -> client);
connectFuture = cf;
cf.whenComplete(this::onConnectComplete);
return cf;
}

/**
Expand Down Expand Up @@ -462,6 +497,12 @@ private void onConnectComplete(Object r, Throwable exc) {
/**
* Handler for connection close.
*
* <p>Only the {@code connectFuture} reset is performed under the entry monitor. The listener
* emit, {@link #lock()}, {@link #shutdown()} and {@link #connectAfter()} calls are made outside
* the monitor; {@code shutdown()} will itself acquire {@code ConnectionImpl} via {@code
* client.close()}, and the close-event path takes the same monitor — holding the entry monitor
* across them would deadlock.
*
* @param r connection instance
* @param exc exception which led to connection close
*/
Expand All @@ -470,7 +511,9 @@ private void handleConnectError(Object r, Throwable exc) {
return;
}
Throwable failure = exc.getCause() != null ? exc.getCause() : exc;
connectFuture = null;
synchronized (this) {
connectFuture = null;
}
log.error("connect error {}/{}: {}", tag, index, failure.toString());
emit(listener -> listener.onConnectionFailed(tag, index, failure));
lock();
Expand All @@ -480,13 +523,19 @@ private void handleConnectError(Object r, Throwable exc) {

/** Reconnect task scheduler. */
private void connectAfter() {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask == null) {
reconnecting.incrementAndGet();
synchronized (this) {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask != null) {
// existing task is being replaced; the existing increment in `reconnecting` carries over
// to the new task, so no counter change is needed here.
reconnectTask.cancel();
} else {
reconnecting.incrementAndGet();
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter));
}

Expand Down Expand Up @@ -658,7 +707,7 @@ private void incHeartbeatCounters(int fail) {
}

/** Stops reconnecting task if it is active. */
private void stopReconnectTask() {
private synchronized void stopReconnectTask() {
if (reconnectTask != null) {
reconnecting.decrementAndGet();
reconnectTask.cancel();
Expand Down
Loading