Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import kotlinx.coroutines.sync.withLock
import java.math.BigDecimal
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

internal class EventQueue constructor(
private val request: Request,
Expand All @@ -27,12 +29,15 @@ internal class EventQueue constructor(

private val scheduler = Scheduler(coroutineScope, flushInMs)
private var scheduleJob: Job? = null
// mutex to control flushing events, ensuring only one operation at a time
// coroutine mutex to control flushing events, ensuring only one operation at a time;
// held across the suspending network publish, so it must remain a coroutine Mutex
private val flushMutex = Mutex()
// mutex to gate modifications to the aggregateEventMap
private val aggregateMutex = Mutex()
// mutex to gate modifications to the eventQueue
private val queueMutex = Mutex()
// lock to gate modifications to the aggregateEventMap; the critical section is non-suspending
// in-memory work reached from the synchronous variable() API, so a thread lock is used
private val aggregateLock = ReentrantLock()
// lock to gate modifications to the eventQueue; non-suspending, reached from the synchronous
// track() API, so a thread lock is used
private val queueLock = ReentrantLock()
// ensures flushEvents does not get called after the sdk is closed
val isClosed = AtomicBoolean(false)
private val flushAgain = AtomicBoolean(true)
Expand All @@ -46,11 +51,13 @@ internal class EventQueue constructor(
val eventsToFlush: MutableList<Event> = mutableListOf()
eventsToFlush.addAll(currentEventQueue)

queueMutex.withLock {
// queueLock/aggregateLock are thread locks; these blocks never suspend, so they
// acquire and release on the same thread, before the suspending publish below.
queueLock.withLock {
eventQueue.removeAll(currentEventQueue)
}

aggregateMutex.withLock {
aggregateLock.withLock {
eventsToFlush.addAll(eventsFromAggregateEventMap())
aggregateEventMap.clear()
}
Expand Down Expand Up @@ -136,12 +143,10 @@ internal class EventQueue constructor(
DevCycleLogger.w("Attempting to queue event after closing DevCycle.")
return
}
runBlocking {
queueMutex.withLock {
eventQueue.add(event)
DevCycleLogger.i("Event queued successfully %s", event)
scheduleJob = scheduler.scheduleWithDelay { run() }
}
queueLock.withLock {
eventQueue.add(event)
DevCycleLogger.i("Event queued successfully %s", event)
scheduleJob = scheduler.scheduleWithDelay { run() }
}
}

Expand All @@ -155,30 +160,28 @@ internal class EventQueue constructor(
DevCycleLogger.w("Attempting to queue aggregate event after closing DVC.")
return
}
runBlocking {
aggregateMutex.withLock {
if (event.target == null || event.target == "") {
throw IllegalArgumentException("Target must be set")
}
if (event.type == "") {
throw IllegalArgumentException("Type must be set")
}

var aggEventType = aggregateEventMap[event.type]

if (aggEventType == null) {
aggEventType = aggregateEventMap.getOrPut(event.type) { HashMap<String, Event>() }
aggEventType[event.target] = event
} else if (aggEventType.containsKey(event.target)) {
aggEventType[event.target] = event.copy(
value = aggEventType[event.target]?.value?.plus(BigDecimal.ONE)
)
} else {
aggEventType[event.target] = event
}
aggregateLock.withLock {
if (event.target == null || event.target == "") {
throw IllegalArgumentException("Target must be set")
}
if (event.type == "") {
throw IllegalArgumentException("Type must be set")
}

scheduleJob = scheduler.scheduleWithDelay { run() }
var aggEventType = aggregateEventMap[event.type]

if (aggEventType == null) {
aggEventType = aggregateEventMap.getOrPut(event.type) { HashMap<String, Event>() }
aggEventType[event.target] = event
} else if (aggEventType.containsKey(event.target)) {
aggEventType[event.target] = event.copy(
value = aggEventType[event.target]?.value?.plus(BigDecimal.ONE)
)
} else {
aggEventType[event.target] = event
}

scheduleJob = scheduler.scheduleWithDelay { run() }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.mockito.Mockito
import java.math.BigDecimal
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit


class EventQueueTests {
Expand Down Expand Up @@ -82,4 +85,59 @@ class EventQueueTests {
Assert.assertEquals(optInEval, evalEventMetadata)
Assert.assertEquals(defaultEval, defaultEventMetadata)
}
}

@Test
fun `queueAggregateEvent throws synchronously when target is missing`() {
val request = Request("some-key", "http://fake.com", "http://fake.com", mockContext)
val user = PopulatedUser("test")
val eventQueue = EventQueue(request, { user }, CoroutineScope(Dispatchers.Default), 60000)
val eval = EvalReason("OPT_IN", "Opt-In", "target")
val eventWithNoTarget =
Event.fromInternalEvent(Event.variableEvent(false, "", eval), user, null)

Assert.assertThrows(IllegalArgumentException::class.java) {
eventQueue.queueAggregateEvent(eventWithNoTarget)
}
}

@Test
fun `concurrent queueAggregateEvent produces correct aggregate counts`() {
val request = Request("some-key", "http://fake.com", "http://fake.com", mockContext)
val user = PopulatedUser("test")
val eventQueue = EventQueue(request, { user }, CoroutineScope(Dispatchers.Default), 60000)
val eval = EvalReason("OPT_IN", "Opt-In", "target")
val keys = listOf("key0", "key1", "key2", "key3")

val threads = 8
val callsPerThread = 5000
val pool = Executors.newFixedThreadPool(threads)
val start = CountDownLatch(1)
val done = CountDownLatch(threads)
repeat(threads) {
pool.submit {
start.await()
repeat(callsPerThread) { i ->
val key = keys[i % keys.size]
eventQueue.queueAggregateEvent(
Event.fromInternalEvent(Event.variableEvent(false, key, eval), user, null)
)
}
done.countDown()
}
}
start.countDown()
Assert.assertTrue("workers did not finish", done.await(30, TimeUnit.SECONDS))
pool.shutdown()

val expectedPerKey = (threads * callsPerThread) / keys.size
keys.forEach { key ->
val event = eventQueue.aggregateEventMap[Event.Companion.EventTypes.variableEvaluated]?.get(key)
Assert.assertEquals(
"lost updates for $key",
BigDecimal(expectedPerKey),
event?.value
)
}
}

}
Loading