Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-buttons-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Properly cancel jobs awaiting on DataChannel low buffer instead of completing on dispose
5 changes: 5 additions & 0 deletions .changeset/wise-spoons-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix exception not being caught when using LocalParticipant.publishData
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ org.gradle.jvmargs=-Xmx4g
# When configured, Gradle will run in incubating parallel mode.
# This option should only be used with decoupled projects. More details, visit
# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects
# org.gradle.parallel=true
org.gradle.parallel=true
# AndroidX package structure to make it clearer which packages are bundled with the
# Android operating system, and which are packaged with your app's APK
# https://developer.android.com/topic/libraries/support-library/androidx-rn
Expand All @@ -20,6 +20,7 @@ android.enableJetifier=false
# Kotlin code style for this project: "official" or "obsolete":
kotlin.code.style=official

org.gradle.caching=true
###############################################################

GROUP=io.livekit
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@ package io.livekit.android.coroutines

import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
Expand Down Expand Up @@ -47,12 +46,12 @@ fun <T> Flow<T>.cancelOnSignal(signal: Flow<Unit?>): Flow<T> = flow {
coroutineScope {
launch {
signal.takeWhile { it == null }.collect()
currentCoroutineContext().cancel()
this@coroutineScope.cancel()
}

collect {
emit(it)
}
currentCoroutineContext().cancel()
this@coroutineScope.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,17 +804,7 @@ internal constructor(
} catch (e: Exception) {
return
}
val manager = when (kind) {
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> {
throw IllegalArgumentException()
}
}

if (manager == null) {
return
}
val manager = dataChannelManagerForKind(kind) ?: return
manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong())
}

Expand All @@ -825,7 +815,7 @@ internal constructor(
}

if (publisher == null) {
throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!")
throw RoomException.ConnectException("Publisher isn't setup yet! Is the room connected?")
}

if (publisher?.isConnected() != true &&
Expand All @@ -835,23 +825,38 @@ internal constructor(
this.negotiatePublisher()
}

val targetChannel = dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!")
if (targetChannel.state() == DataChannel.State.OPEN) {
// Ensure data channel exists
dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher data channel not established for ${kind.name}; is the room connected?")

val channelManager = dataChannelManagerForKind(kind)
?: throw RoomException.ConnectException("Publisher data channel manager not established for ${kind.name}; is the room connected?")
if (channelManager.state == DataChannel.State.OPEN) {
return
}

// wait until publisher ICE connected
val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS
while (SystemClock.elapsedRealtime() < endTime) {
if (publisher?.isConnected() == true && targetChannel.state() == DataChannel.State.OPEN) {
if (publisher?.isConnected() == true &&
channelManager.state == DataChannel.State.OPEN
) {
return
}
delay(50)
}

throw RoomException.ConnectException("could not establish publisher connection")
throw RoomException.ConnectException(
"could not establish publisher connection: publisher state: ${publisherObserver.connectionState}, channel state: ${channelManager.state}",
)
}

private fun dataChannelManagerForKind(kind: LivekitModels.DataPacket.Kind): DataChannelManager? =
when (kind) {
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> throw IllegalArgumentException("Unknown data packet kind!")
}

private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) =
when (kind) {
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import livekit.org.webrtc.DataChannel
/**
* @suppress
*/
internal class DataChannelManager(
class DataChannelManager(
val dataChannel: DataChannel,
private val dataMessageListener: DataChannel.Observer,
val rtcThreadToken: RTCThreadToken,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,44 +23,94 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
var observer: Observer? = null
var sentBuffers = mutableListOf<Buffer>()

private var stateBacking: State = State.OPEN
var state: State
get() {
ensureNotDisposed()
return stateBacking
}
set(value) {
ensureNotDisposed()
if (value == stateBacking) {
return
}
stateBacking = value
observer?.onStateChange()
}

private var bufferedAmountBacking: Long = 0L
var bufferedAmount: Long
get() {
ensureNotDisposed()
return bufferedAmountBacking
}
set(value) {
if (value == bufferedAmountBacking) {
return
}
val previous = bufferedAmountBacking
bufferedAmountBacking = value
observer?.onBufferedAmountChange(previous)
}

var isDisposed = false

fun clearSentBuffers() {
sentBuffers.clear()
}

override fun registerObserver(observer: Observer?) {
ensureNotDisposed()
this.observer = observer
}

override fun unregisterObserver() {
ensureNotDisposed()
this.observer = null
}

override fun label(): String? {
ensureNotDisposed()
return label
}

override fun id(): Int {
ensureNotDisposed()
return 0
}

override fun state(): State {
return State.OPEN
ensureNotDisposed()
return state
}

override fun bufferedAmount(): Long {
return 0
ensureNotDisposed()
return bufferedAmount
}

override fun send(buffer: Buffer): Boolean {
ensureNotDisposed()
sentBuffers.add(buffer)
return true
}

override fun close() {
ensureNotDisposed()
state = State.CLOSED
}

override fun dispose() {
ensureNotDisposed()
state = State.CLOSED
isDisposed = true
}

fun ensureNotDisposed() {
if (isDisposed) {
throw IllegalStateException("Data channel is closed!")
}
}
fun simulateBufferReceived(buffer: Buffer) {
observer?.onMessage(buffer)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.livekit.android.webrtc

import io.livekit.android.test.BaseTest
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockRTCThreadToken
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.yield
import livekit.org.webrtc.DataChannel
import org.junit.Assert.assertEquals
import org.junit.Assert.assertSame
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Test
import java.nio.ByteBuffer

@OptIn(ExperimentalCoroutinesApi::class)
class DataChannelManagerTest : BaseTest() {

companion object {
private val NOOP_OBSERVER = object : DataChannel.Observer {
override fun onBufferedAmountChange(amount: Long) {}

override fun onStateChange() {}

override fun onMessage(buffer: DataChannel.Buffer) {}
}
}

@Test
fun onMessage_forwardsToDelegate() {
val channel = MockDataChannel("dc")
var received: DataChannel.Buffer? = null
val delegate = object : DataChannel.Observer {
override fun onBufferedAmountChange(amount: Long) {}

override fun onStateChange() {}

override fun onMessage(buffer: DataChannel.Buffer) {
received = buffer
}
}
val manager = DataChannelManager(channel, delegate, MockRTCThreadToken())
val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3))
val buffer = DataChannel.Buffer(payload, true)
manager.onMessage(buffer)
assertSame(buffer, received)
}

@Test
fun onStateChange_updatesStateFromChannel() {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
channel.registerObserver(manager)
assertEquals(DataChannel.State.OPEN, manager.state)
channel.state = DataChannel.State.CLOSING
assertEquals(DataChannel.State.CLOSING, manager.state)
}

@Test
fun onBufferedAmountChange_updatesBufferedAmountFromChannel() {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
channel.registerObserver(manager)
channel.bufferedAmount = 12_345L
assertEquals(12_345L, manager.bufferedAmount)
}

@Test
fun dispose_closesDataChannelOnce() {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
manager.dispose()
assertTrue(manager.disposed)
}

@Test
fun dispose_secondCallIsNoOp() {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
manager.dispose()
manager.dispose()
}

@Test
fun waitForBufferedAmountLow_completesWhenAlreadyBelowThreshold() = runTest {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
channel.registerObserver(manager)
channel.bufferedAmount = 100L
manager.waitForBufferedAmountLow(15_000L)
}

@Test
fun waitForBufferedAmountLow_completesWhenAmountDrops() = runTest {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
channel.registerObserver(manager)
channel.bufferedAmount = 25_000L
val waiter = async { manager.waitForBufferedAmountLow(15_000L) }
yield()
channel.bufferedAmount = 0L
waiter.await()
}

@Test
fun waitForBufferedAmountLow_cancelledWhenDisposedWhileWaiting() = runTest {
val channel = MockDataChannel("dc")
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
channel.registerObserver(manager)
channel.bufferedAmount = 100_000L
val waiter = async { manager.waitForBufferedAmountLow(15_000L) }
yield()
manager.dispose()
try {
waiter.await()
fail("expected CancellationException")
} catch (_: CancellationException) {
}
}
}
Loading