Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -43,91 +42,97 @@ public class BlockingPoolTest
private CloseableDefaultBlockingPool<Integer> pool;
private CloseableDefaultBlockingPool<Integer> emptyPool;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
@BeforeEach
public void setup()
{
service = Execs.multiThreaded(2, "blocking-pool-test");
pool = new CloseableDefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
emptyPool = new CloseableDefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
}

@After
@AfterEach
public void teardown()
{
pool.close();
emptyPool.close();
service.shutdownNow();
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testParallelInit()
{
DefaultBlockingPool<Integer> parallelPool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10, true);
Assert.assertEquals(10, parallelPool.getPoolSize());
Assertions.assertEquals(10, parallelPool.getPoolSize());
final ReferenceCountingResourceHolder<Integer> holder =
Iterables.getOnlyElement(parallelPool.takeBatch(1, 100), null);
Assert.assertNotNull(holder);
Assert.assertEquals(9, parallelPool.getPoolSize());
Assertions.assertNotNull(holder);
Assertions.assertEquals(9, parallelPool.getPoolSize());
holder.close();
Assert.assertEquals(10, parallelPool.getPoolSize());
Assertions.assertEquals(10, parallelPool.getPoolSize());
}

@Test
public void testTakeFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
emptyPool.takeBatch(1, 0);
final IllegalStateException e = Assertions.assertThrows(
IllegalStateException.class,
() -> emptyPool.takeBatch(1, 0)
);
Assertions.assertTrue(e.getMessage().contains("Pool was initialized with limit = 0, there are no objects to take."));
}

@Test
public void testDrainFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
emptyPool.takeBatch(1, 0);
final IllegalStateException e = Assertions.assertThrows(
IllegalStateException.class,
() -> emptyPool.takeBatch(1, 0)
);
Assertions.assertTrue(e.getMessage().contains("Pool was initialized with limit = 0, there are no objects to take."));
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testTake()
{
final ReferenceCountingResourceHolder<Integer> holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null);
Assert.assertNotNull(holder);
Assert.assertEquals(9, pool.getPoolSize());
Assertions.assertNotNull(holder);
Assertions.assertEquals(9, pool.getPoolSize());
holder.close();
Assert.assertEquals(10, pool.getPoolSize());
Assertions.assertEquals(10, pool.getPoolSize());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testTakeTimeout()
{
final List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 100L);
final ReferenceCountingResourceHolder<Integer> holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null);
Assert.assertNull(holder);
Assertions.assertNull(holder);
batchHolder.forEach(ReferenceCountingResourceHolder::close);
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testTakeBatch()
{
final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(6, 100L);
Assert.assertNotNull(holder);
Assert.assertEquals(6, holder.size());
Assert.assertEquals(4, pool.getPoolSize());
Assertions.assertNotNull(holder);
Assertions.assertEquals(6, holder.size());
Assertions.assertEquals(4, pool.getPoolSize());
holder.forEach(ReferenceCountingResourceHolder::close);
Assert.assertEquals(10, pool.getPoolSize());
Assertions.assertEquals(10, pool.getPoolSize());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException
{
List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 10);
Assert.assertNotNull(batchHolder);
Assert.assertEquals(10, batchHolder.size());
Assert.assertEquals(0, pool.getPoolSize());
Assertions.assertNotNull(batchHolder);
Assertions.assertEquals(10, batchHolder.size());
Assertions.assertEquals(0, pool.getPoolSize());

final Future<List<ReferenceCountingResourceHolder<Integer>>> future = service.submit(
() -> pool.takeBatch(8, 100)
Expand All @@ -136,22 +141,24 @@ public void testWaitAndTakeBatch() throws InterruptedException, ExecutionExcepti
batchHolder.forEach(ReferenceCountingResourceHolder::close);

batchHolder = future.get();
Assert.assertNotNull(batchHolder);
Assert.assertEquals(8, batchHolder.size());
Assert.assertEquals(2, pool.getPoolSize());
Assertions.assertNotNull(batchHolder);
Assertions.assertEquals(8, batchHolder.size());
Assertions.assertEquals(2, pool.getPoolSize());

batchHolder.forEach(ReferenceCountingResourceHolder::close);
Assert.assertEquals(10, pool.getPoolSize());
Assertions.assertEquals(10, pool.getPoolSize());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testTakeBatchTooManyObjects()
{
final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(100, 100L);
Assert.assertTrue(holder.isEmpty());
Assertions.assertTrue(holder.isEmpty());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testConcurrentTake() throws ExecutionException, InterruptedException
{
final int limit1 = pool.maxSize() / 2;
Expand Down Expand Up @@ -179,8 +186,8 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();

Assert.assertEquals(0, pool.getPoolSize());
Assert.assertTrue(r1.contains(null) || r2.contains(null));
Assertions.assertEquals(0, pool.getPoolSize());
Assertions.assertTrue(r1.contains(null) || r2.contains(null));

int nonNullCount = 0;
for (ReferenceCountingResourceHolder<Integer> holder : r1) {
Expand All @@ -194,7 +201,7 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException
nonNullCount++;
}
}
Assert.assertEquals(pool.maxSize(), nonNullCount);
Assertions.assertEquals(pool.maxSize(), nonNullCount);

final Future future1 = service.submit(() -> {
for (ReferenceCountingResourceHolder<Integer> holder : r1) {
Expand All @@ -214,10 +221,11 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException
future1.get();
future2.get();

Assert.assertEquals(pool.maxSize(), pool.getPoolSize());
Assertions.assertEquals(pool.maxSize(), pool.getPoolSize());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException
{
final int batch1 = pool.maxSize() / 2;
Expand All @@ -233,21 +241,22 @@ public void testConcurrentTakeBatch() throws ExecutionException, InterruptedExce
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();

if (!r1.isEmpty()) {
Assert.assertTrue(r2.isEmpty());
Assert.assertEquals(pool.maxSize() - batch1, pool.getPoolSize());
Assert.assertEquals(batch1, r1.size());
Assertions.assertTrue(r2.isEmpty());
Assertions.assertEquals(pool.maxSize() - batch1, pool.getPoolSize());
Assertions.assertEquals(batch1, r1.size());
r1.forEach(ReferenceCountingResourceHolder::close);
} else {
Assert.assertNotNull(r2);
Assert.assertEquals(pool.maxSize() - batch2, pool.getPoolSize());
Assert.assertEquals(batch2, r2.size());
Assertions.assertNotNull(r2);
Assertions.assertEquals(pool.maxSize() - batch2, pool.getPoolSize());
Assertions.assertEquals(batch2, r2.size());
r2.forEach(ReferenceCountingResourceHolder::close);
}

Assert.assertEquals(pool.maxSize(), pool.getPoolSize());
Assertions.assertEquals(pool.maxSize(), pool.getPoolSize());
}

@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testConcurrentBatchClose() throws ExecutionException, InterruptedException
{
final int batch1 = pool.maxSize() / 2;
Expand All @@ -262,23 +271,24 @@ public void testConcurrentBatchClose() throws ExecutionException, InterruptedExc
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();

Assert.assertNotNull(r1);
Assert.assertNotNull(r2);
Assert.assertEquals(batch1, r1.size());
Assert.assertEquals(batch2, r2.size());
Assert.assertEquals(0, pool.getPoolSize());
Assertions.assertNotNull(r1);
Assertions.assertNotNull(r2);
Assertions.assertEquals(batch1, r1.size());
Assertions.assertEquals(batch2, r2.size());
Assertions.assertEquals(0, pool.getPoolSize());

final Future future1 = service.submit(() -> r1.forEach(ReferenceCountingResourceHolder::close));
final Future future2 = service.submit(() -> r2.forEach(ReferenceCountingResourceHolder::close));

future1.get();
future2.get();

Assert.assertEquals(pool.maxSize(), pool.getPoolSize());
Assertions.assertEquals(pool.maxSize(), pool.getPoolSize());
}

@SuppressWarnings("CatchMayIgnoreException")
@Test(timeout = 60_000L)
@Test
@Timeout(60)
public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException
{
final List<ReferenceCountingResourceHolder<Integer>> r1 = pool.takeBatch(1, 10);
Expand All @@ -298,11 +308,11 @@ public void testConcurrentTakeBatchClose() throws ExecutionException, Interrupte

final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
f1.get();
Assert.assertNotNull(r2);
Assert.assertEquals(10, r2.size());
Assert.assertEquals(0, pool.getPoolSize());
Assertions.assertNotNull(r2);
Assertions.assertEquals(10, r2.size());
Assertions.assertEquals(0, pool.getPoolSize());

r2.forEach(ReferenceCountingResourceHolder::close);
Assert.assertEquals(pool.maxSize(), pool.getPoolSize());
Assertions.assertEquals(pool.maxSize(), pool.getPoolSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,36 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.Parameter;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@RunWith(Parameterized.class)
@ParameterizedClass
@MethodSource("constructorFeeder")
public class CombiningSequenceTest
{
@Parameterized.Parameters
public static Collection<Object[]> valuesToTry()
public static Stream<Object[]> constructorFeeder()
{
return Arrays.asList(new Object[][]{
return Arrays.stream(new Object[][]{
{1}, {2}, {3}, {4}, {5}, {1000}
});
}

private final int yieldEvery;

public CombiningSequenceTest(int yieldEvery)
{
this.yieldEvery = yieldEvery;
}
@Parameter(0)
public int yieldEvery;

@Test
public void testMerge() throws Exception
Expand Down Expand Up @@ -225,13 +220,13 @@ public Integer accumulate(Integer accumulated, Integer in)
}
}
);
Assert.fail("Expected exception");
Assertions.fail("Expected exception");
}
catch (Exception e) {
Assert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("boom")));
Assertions.assertEquals("boom", e.getMessage());
}

Assert.assertEquals("Closes resources", 1, bomb.getCloseCount());
Assertions.assertEquals(1, bomb.getCloseCount(), "Closes resources");
}

private void testCombining(List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> expected)
Expand Down Expand Up @@ -279,7 +274,7 @@ private void testCombining(

List<Pair<Integer, Integer>> merged = seq.toList();

Assert.assertEquals(prefix, expected, merged);
Assertions.assertEquals(expected, merged, prefix);

Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
null,
Expand Down Expand Up @@ -326,14 +321,14 @@ public boolean apply(
while (!yielder.isDone()) {
final Pair<Integer, Integer> expectedVal = expectedVals.next();
final Pair<Integer, Integer> actual = yielder.get();
Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
Assertions.assertEquals(expectedVal, actual, StringUtils.format("%s, i[%s]", prefix, i++));
yielder = yielder.next(actual);
}
}
Assert.assertTrue(prefix, yielder.isDone());
Assert.assertFalse(prefix, expectedVals.hasNext());
Assertions.assertTrue(yielder.isDone(), prefix);
Assertions.assertFalse(expectedVals.hasNext(), prefix);
yielder.close();

Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(closed.await(10000, TimeUnit.MILLISECONDS), "resource closed");
}
}
Loading
Loading