diff --git a/android-client-sdk/src/main/java/com/devcycle/sdk/android/api/EventQueue.kt b/android-client-sdk/src/main/java/com/devcycle/sdk/android/api/EventQueue.kt index 6994be20..48cec264 100644 --- a/android-client-sdk/src/main/java/com/devcycle/sdk/android/api/EventQueue.kt +++ b/android-client-sdk/src/main/java/com/devcycle/sdk/android/api/EventQueue.kt @@ -14,6 +14,8 @@ import kotlinx.coroutines.sync.withLock import java.math.BigDecimal import java.util.* import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock internal class EventQueue constructor( private val request: Request, @@ -27,12 +29,15 @@ internal class EventQueue constructor( private val scheduler = Scheduler(coroutineScope, flushInMs) private var scheduleJob: Job? = null - // mutex to control flushing events, ensuring only one operation at a time + // coroutine mutex to control flushing events, ensuring only one operation at a time; + // held across the suspending network publish, so it must remain a coroutine Mutex private val flushMutex = Mutex() - // mutex to gate modifications to the aggregateEventMap - private val aggregateMutex = Mutex() - // mutex to gate modifications to the eventQueue - private val queueMutex = Mutex() + // lock to gate modifications to the aggregateEventMap; the critical section is non-suspending + // in-memory work reached from the synchronous variable() API, so a thread lock is used + private val aggregateLock = ReentrantLock() + // lock to gate modifications to the eventQueue; non-suspending, reached from the synchronous + // track() API, so a thread lock is used + private val queueLock = ReentrantLock() // ensures flushEvents does not get called after the sdk is closed val isClosed = AtomicBoolean(false) private val flushAgain = AtomicBoolean(true) @@ -46,11 +51,13 @@ internal class EventQueue constructor( val eventsToFlush: MutableList = mutableListOf() eventsToFlush.addAll(currentEventQueue) - queueMutex.withLock { + // queueLock/aggregateLock are thread locks; these blocks never suspend, so they + // acquire and release on the same thread, before the suspending publish below. + queueLock.withLock { eventQueue.removeAll(currentEventQueue) } - aggregateMutex.withLock { + aggregateLock.withLock { eventsToFlush.addAll(eventsFromAggregateEventMap()) aggregateEventMap.clear() } @@ -136,12 +143,10 @@ internal class EventQueue constructor( DevCycleLogger.w("Attempting to queue event after closing DevCycle.") return } - runBlocking { - queueMutex.withLock { - eventQueue.add(event) - DevCycleLogger.i("Event queued successfully %s", event) - scheduleJob = scheduler.scheduleWithDelay { run() } - } + queueLock.withLock { + eventQueue.add(event) + DevCycleLogger.i("Event queued successfully %s", event) + scheduleJob = scheduler.scheduleWithDelay { run() } } } @@ -155,30 +160,28 @@ internal class EventQueue constructor( DevCycleLogger.w("Attempting to queue aggregate event after closing DVC.") return } - runBlocking { - aggregateMutex.withLock { - if (event.target == null || event.target == "") { - throw IllegalArgumentException("Target must be set") - } - if (event.type == "") { - throw IllegalArgumentException("Type must be set") - } - - var aggEventType = aggregateEventMap[event.type] - - if (aggEventType == null) { - aggEventType = aggregateEventMap.getOrPut(event.type) { HashMap() } - aggEventType[event.target] = event - } else if (aggEventType.containsKey(event.target)) { - aggEventType[event.target] = event.copy( - value = aggEventType[event.target]?.value?.plus(BigDecimal.ONE) - ) - } else { - aggEventType[event.target] = event - } + aggregateLock.withLock { + if (event.target == null || event.target == "") { + throw IllegalArgumentException("Target must be set") + } + if (event.type == "") { + throw IllegalArgumentException("Type must be set") + } - scheduleJob = scheduler.scheduleWithDelay { run() } + var aggEventType = aggregateEventMap[event.type] + + if (aggEventType == null) { + aggEventType = aggregateEventMap.getOrPut(event.type) { HashMap() } + aggEventType[event.target] = event + } else if (aggEventType.containsKey(event.target)) { + aggEventType[event.target] = event.copy( + value = aggEventType[event.target]?.value?.plus(BigDecimal.ONE) + ) + } else { + aggEventType[event.target] = event } + + scheduleJob = scheduler.scheduleWithDelay { run() } } } diff --git a/android-client-sdk/src/test/java/com/devcycle/sdk/android/api/EventQueueTests.kt b/android-client-sdk/src/test/java/com/devcycle/sdk/android/api/EventQueueTests.kt index 9245a6a0..8367da3e 100644 --- a/android-client-sdk/src/test/java/com/devcycle/sdk/android/api/EventQueueTests.kt +++ b/android-client-sdk/src/test/java/com/devcycle/sdk/android/api/EventQueueTests.kt @@ -13,6 +13,9 @@ import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.mockito.Mockito import java.math.BigDecimal +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit class EventQueueTests { @@ -82,4 +85,59 @@ class EventQueueTests { Assert.assertEquals(optInEval, evalEventMetadata) Assert.assertEquals(defaultEval, defaultEventMetadata) } -} \ No newline at end of file + + @Test + fun `queueAggregateEvent throws synchronously when target is missing`() { + val request = Request("some-key", "http://fake.com", "http://fake.com", mockContext) + val user = PopulatedUser("test") + val eventQueue = EventQueue(request, { user }, CoroutineScope(Dispatchers.Default), 60000) + val eval = EvalReason("OPT_IN", "Opt-In", "target") + val eventWithNoTarget = + Event.fromInternalEvent(Event.variableEvent(false, "", eval), user, null) + + Assert.assertThrows(IllegalArgumentException::class.java) { + eventQueue.queueAggregateEvent(eventWithNoTarget) + } + } + + @Test + fun `concurrent queueAggregateEvent produces correct aggregate counts`() { + val request = Request("some-key", "http://fake.com", "http://fake.com", mockContext) + val user = PopulatedUser("test") + val eventQueue = EventQueue(request, { user }, CoroutineScope(Dispatchers.Default), 60000) + val eval = EvalReason("OPT_IN", "Opt-In", "target") + val keys = listOf("key0", "key1", "key2", "key3") + + val threads = 8 + val callsPerThread = 5000 + val pool = Executors.newFixedThreadPool(threads) + val start = CountDownLatch(1) + val done = CountDownLatch(threads) + repeat(threads) { + pool.submit { + start.await() + repeat(callsPerThread) { i -> + val key = keys[i % keys.size] + eventQueue.queueAggregateEvent( + Event.fromInternalEvent(Event.variableEvent(false, key, eval), user, null) + ) + } + done.countDown() + } + } + start.countDown() + Assert.assertTrue("workers did not finish", done.await(30, TimeUnit.SECONDS)) + pool.shutdown() + + val expectedPerKey = (threads * callsPerThread) / keys.size + keys.forEach { key -> + val event = eventQueue.aggregateEventMap[Event.Companion.EventTypes.variableEvaluated]?.get(key) + Assert.assertEquals( + "lost updates for $key", + BigDecimal(expectedPerKey), + event?.value + ) + } + } + +}