diff --git a/.changeset/warm-buttons-switch.md b/.changeset/warm-buttons-switch.md new file mode 100644 index 000000000..989e2a626 --- /dev/null +++ b/.changeset/warm-buttons-switch.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Properly cancel jobs awaiting on DataChannel low buffer instead of completing on dispose diff --git a/.changeset/wise-spoons-occur.md b/.changeset/wise-spoons-occur.md new file mode 100644 index 000000000..8b68397f4 --- /dev/null +++ b/.changeset/wise-spoons-occur.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix exception not being caught when using LocalParticipant.publishData diff --git a/gradle.properties b/gradle.properties index f82b04858..2c6a0db23 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,7 +10,7 @@ org.gradle.jvmargs=-Xmx4g # When configured, Gradle will run in incubating parallel mode. # This option should only be used with decoupled projects. More details, visit # http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects -# org.gradle.parallel=true +org.gradle.parallel=true # AndroidX package structure to make it clearer which packages are bundled with the # Android operating system, and which are packaged with your app's APK # https://developer.android.com/topic/libraries/support-library/androidx-rn @@ -20,6 +20,7 @@ android.enableJetifier=false # Kotlin code style for this project: "official" or "obsolete": kotlin.code.style=official +org.gradle.caching=true ############################################################### GROUP=io.livekit diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/coroutines/FlowExt.kt b/livekit-android-sdk/src/main/java/io/livekit/android/coroutines/FlowExt.kt index 2e221b533..d85b8cc23 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/coroutines/FlowExt.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/coroutines/FlowExt.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package io.livekit.android.coroutines import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow @@ -47,12 +46,12 @@ fun Flow.cancelOnSignal(signal: Flow): Flow = flow { coroutineScope { launch { signal.takeWhile { it == null }.collect() - currentCoroutineContext().cancel() + this@coroutineScope.cancel() } collect { emit(it) } - currentCoroutineContext().cancel() + this@coroutineScope.cancel() } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 6ba499901..c35761513 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -804,17 +804,7 @@ internal constructor( } catch (e: Exception) { return } - val manager = when (kind) { - LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager - LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager - LivekitModels.DataPacket.Kind.UNRECOGNIZED -> { - throw IllegalArgumentException() - } - } - - if (manager == null) { - return - } + val manager = dataChannelManagerForKind(kind) ?: return manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong()) } @@ -825,7 +815,7 @@ internal constructor( } if (publisher == null) { - throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!") + throw RoomException.ConnectException("Publisher isn't setup yet! Is the room connected?") } if (publisher?.isConnected() != true && @@ -835,23 +825,38 @@ internal constructor( this.negotiatePublisher() } - val targetChannel = dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!") - if (targetChannel.state() == DataChannel.State.OPEN) { + // Ensure data channel exists + dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher data channel not established for ${kind.name}; is the room connected?") + + val channelManager = dataChannelManagerForKind(kind) + ?: throw RoomException.ConnectException("Publisher data channel manager not established for ${kind.name}; is the room connected?") + if (channelManager.state == DataChannel.State.OPEN) { return } // wait until publisher ICE connected val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS while (SystemClock.elapsedRealtime() < endTime) { - if (publisher?.isConnected() == true && targetChannel.state() == DataChannel.State.OPEN) { + if (publisher?.isConnected() == true && + channelManager.state == DataChannel.State.OPEN + ) { return } delay(50) } - throw RoomException.ConnectException("could not establish publisher connection") + throw RoomException.ConnectException( + "could not establish publisher connection: publisher state: ${publisherObserver.connectionState}, channel state: ${channelManager.state}", + ) } + private fun dataChannelManagerForKind(kind: LivekitModels.DataPacket.Kind): DataChannelManager? = + when (kind) { + LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager + LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager + LivekitModels.DataPacket.Kind.UNRECOGNIZED -> throw IllegalArgumentException("Unknown data packet kind!") + } + private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) = when (kind) { LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt index 2667828ad..78bdbb183 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import livekit.org.webrtc.DataChannel /** * @suppress */ -internal class DataChannelManager( +class DataChannelManager( val dataChannel: DataChannel, private val dataMessageListener: DataChannel.Observer, val rtcThreadToken: RTCThreadToken, diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt index f975ce737..8a332dc25 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,44 +23,94 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { var observer: Observer? = null var sentBuffers = mutableListOf() + private var stateBacking: State = State.OPEN + var state: State + get() { + ensureNotDisposed() + return stateBacking + } + set(value) { + ensureNotDisposed() + if (value == stateBacking) { + return + } + stateBacking = value + observer?.onStateChange() + } + + private var bufferedAmountBacking: Long = 0L + var bufferedAmount: Long + get() { + ensureNotDisposed() + return bufferedAmountBacking + } + set(value) { + if (value == bufferedAmountBacking) { + return + } + val previous = bufferedAmountBacking + bufferedAmountBacking = value + observer?.onBufferedAmountChange(previous) + } + + var isDisposed = false + fun clearSentBuffers() { sentBuffers.clear() } override fun registerObserver(observer: Observer?) { + ensureNotDisposed() this.observer = observer } override fun unregisterObserver() { + ensureNotDisposed() + this.observer = null } override fun label(): String? { + ensureNotDisposed() return label } override fun id(): Int { + ensureNotDisposed() return 0 } override fun state(): State { - return State.OPEN + ensureNotDisposed() + return state } override fun bufferedAmount(): Long { - return 0 + ensureNotDisposed() + return bufferedAmount } override fun send(buffer: Buffer): Boolean { + ensureNotDisposed() sentBuffers.add(buffer) return true } override fun close() { + ensureNotDisposed() + state = State.CLOSED } override fun dispose() { + ensureNotDisposed() + state = State.CLOSED + isDisposed = true } + fun ensureNotDisposed() { + if (isDisposed) { + throw IllegalStateException("Data channel is closed!") + } + } fun simulateBufferReceived(buffer: Buffer) { observer?.onMessage(buffer) } diff --git a/livekit-android-test/src/test/java/io/livekit/android/webrtc/DataChannelManagerTest.kt b/livekit-android-test/src/test/java/io/livekit/android/webrtc/DataChannelManagerTest.kt new file mode 100644 index 000000000..51b5212c9 --- /dev/null +++ b/livekit-android-test/src/test/java/io/livekit/android/webrtc/DataChannelManagerTest.kt @@ -0,0 +1,138 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.webrtc + +import io.livekit.android.test.BaseTest +import io.livekit.android.test.mock.MockDataChannel +import io.livekit.android.test.mock.MockRTCThreadToken +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.yield +import livekit.org.webrtc.DataChannel +import org.junit.Assert.assertEquals +import org.junit.Assert.assertSame +import org.junit.Assert.assertTrue +import org.junit.Assert.fail +import org.junit.Test +import java.nio.ByteBuffer + +@OptIn(ExperimentalCoroutinesApi::class) +class DataChannelManagerTest : BaseTest() { + + companion object { + private val NOOP_OBSERVER = object : DataChannel.Observer { + override fun onBufferedAmountChange(amount: Long) {} + + override fun onStateChange() {} + + override fun onMessage(buffer: DataChannel.Buffer) {} + } + } + + @Test + fun onMessage_forwardsToDelegate() { + val channel = MockDataChannel("dc") + var received: DataChannel.Buffer? = null + val delegate = object : DataChannel.Observer { + override fun onBufferedAmountChange(amount: Long) {} + + override fun onStateChange() {} + + override fun onMessage(buffer: DataChannel.Buffer) { + received = buffer + } + } + val manager = DataChannelManager(channel, delegate, MockRTCThreadToken()) + val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3)) + val buffer = DataChannel.Buffer(payload, true) + manager.onMessage(buffer) + assertSame(buffer, received) + } + + @Test + fun onStateChange_updatesStateFromChannel() { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + channel.registerObserver(manager) + assertEquals(DataChannel.State.OPEN, manager.state) + channel.state = DataChannel.State.CLOSING + assertEquals(DataChannel.State.CLOSING, manager.state) + } + + @Test + fun onBufferedAmountChange_updatesBufferedAmountFromChannel() { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + channel.registerObserver(manager) + channel.bufferedAmount = 12_345L + assertEquals(12_345L, manager.bufferedAmount) + } + + @Test + fun dispose_closesDataChannelOnce() { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + manager.dispose() + assertTrue(manager.disposed) + } + + @Test + fun dispose_secondCallIsNoOp() { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + manager.dispose() + manager.dispose() + } + + @Test + fun waitForBufferedAmountLow_completesWhenAlreadyBelowThreshold() = runTest { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + channel.registerObserver(manager) + channel.bufferedAmount = 100L + manager.waitForBufferedAmountLow(15_000L) + } + + @Test + fun waitForBufferedAmountLow_completesWhenAmountDrops() = runTest { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + channel.registerObserver(manager) + channel.bufferedAmount = 25_000L + val waiter = async { manager.waitForBufferedAmountLow(15_000L) } + yield() + channel.bufferedAmount = 0L + waiter.await() + } + + @Test + fun waitForBufferedAmountLow_cancelledWhenDisposedWhileWaiting() = runTest { + val channel = MockDataChannel("dc") + val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken()) + channel.registerObserver(manager) + channel.bufferedAmount = 100_000L + val waiter = async { manager.waitForBufferedAmountLow(15_000L) } + yield() + manager.dispose() + try { + waiter.await() + fail("expected CancellationException") + } catch (_: CancellationException) { + } + } +}