Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #189 +/- ##
===========================================
- Coverage 87.91% 68.43% -19.49%
===========================================
Files 37 37
Lines 811 1020 +209
Branches 110 154 +44
===========================================
- Hits 713 698 -15
- Misses 53 273 +220
- Partials 45 49 +4 ☔ View full report in Codecov by Sentry. |
…blishWithSelector
…blishWithSelector
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a "publish with selector" functionality that allows for advanced flow transformation using a selector-based pattern. The implementation enables multiple flows to be derived from a single source flow with shared subscription and selective processing.
- Implements
publishfunction with selector scope for creating multiple derived flows from a single source - Adds comprehensive state management for selector scopes with thread-safe operations
- Provides specialized DSL annotations and interfaces for type-safe selector operations
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| publishWithSelector.kt | Main implementation with publish function, selector scopes, state management, and example usage |
| SimpleSuspendLazy.kt | Thread-safe lazy initialization for suspend functions with cleanup capabilities |
| SimpleLazy.kt | Thread-safe lazy initialization for regular functions with cleanup capabilities |
| AtomicRef.kt | Atomic reference utilities with loop and update operations |
| FlowExt.api | API definitions for the new publish selector functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| val stateRef = AtomicRef<SelectorScopeState<T>>(SelectorScopeState.Init) | ||
|
|
||
| override fun <R> select(block: SelectorFunction<T, R>): Flow<R> { | ||
| println("call select with block: $block") |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("call select with block: $block") |
| println( | ||
| "onCompleteSelectedFlow: completedCount = ${(updated as? SelectorScopeState.Frozen)?.completedCount}, " + | ||
| "size = ${state.blocks.size}, " + | ||
| "(index = $index)", | ||
| ) |
There was a problem hiding this comment.
Debug print statements should be removed from production code. Consider using a proper logging framework or removing these debug outputs.
| clear() | ||
| } | ||
|
|
||
| println("onCompleteSelectedFlow: cancel the publish scope") |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("onCompleteSelectedFlow: cancel the publish scope") |
| } | ||
|
|
||
| suspend fun send(value: T) { | ||
| println("send: $value") |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("send: $value") |
| } | ||
|
|
||
| fun close(e: Throwable?) { | ||
| println("close: $e") |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("close: $e") |
| println("cancel: $e") | ||
| transitionToClosed { it.cancel(e) } | ||
| } | ||
| } |
There was a problem hiding this comment.
Debug print statement should be removed from production code. Consider using a proper logging framework or removing this debug output.
| println("cancel: $e") | |
| transitionToClosed { it.cancel(e) } | |
| } | |
| } | |
| transitionToClosed { it.cancel(e) } | |
| } | |
| } | |
| } |
| @OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) | ||
| public suspend fun main() { | ||
| flow<Any?> { | ||
| println("Collect...") | ||
| delay(100) | ||
| emit(1) | ||
| delay(100) | ||
| emit(2) | ||
| delay(100) | ||
| emit(3) | ||
| delay(100) | ||
| emit("4") | ||
| }.onEach { println(">>> onEach: $it") } | ||
| .publish { | ||
| delay(100) | ||
|
|
||
| merge( | ||
| select { flow -> | ||
| delay(1) | ||
| val sharedFlow = flow.shareIn() | ||
|
|
||
| interval(0, 100) | ||
| .onEach { println(">>> interval: $it") } | ||
| .flatMapMerge { value -> | ||
| timer(value, 50) | ||
| .withLatestFrom(sharedFlow) | ||
| .map { it to "shared" } | ||
| }.takeUntil(sharedFlow.filter { it == 3 }) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<Int>() | ||
| .filter { it % 2 == 0 } | ||
| .map { it to "even" } | ||
| .take(1) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<Int>() | ||
| .filter { it % 2 != 0 } | ||
| .map { it to "odd" } | ||
| .take(1) | ||
| }, | ||
| select { flow -> | ||
| flow | ||
| .filterIsInstance<String>() | ||
| .map { it to "string" } | ||
| .take(1) | ||
| }, | ||
| ) | ||
| } | ||
| .toList() | ||
| .also { println(it) } | ||
| .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } | ||
| } |
There was a problem hiding this comment.
The main function appears to be a test or example and should not be included in production library code. Consider moving this to a test file or removing it entirely.
| @OptIn(FlowExtPreview::class, ExperimentalCoroutinesApi::class) | |
| public suspend fun main() { | |
| flow<Any?> { | |
| println("Collect...") | |
| delay(100) | |
| emit(1) | |
| delay(100) | |
| emit(2) | |
| delay(100) | |
| emit(3) | |
| delay(100) | |
| emit("4") | |
| }.onEach { println(">>> onEach: $it") } | |
| .publish { | |
| delay(100) | |
| merge( | |
| select { flow -> | |
| delay(1) | |
| val sharedFlow = flow.shareIn() | |
| interval(0, 100) | |
| .onEach { println(">>> interval: $it") } | |
| .flatMapMerge { value -> | |
| timer(value, 50) | |
| .withLatestFrom(sharedFlow) | |
| .map { it to "shared" } | |
| }.takeUntil(sharedFlow.filter { it == 3 }) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<Int>() | |
| .filter { it % 2 == 0 } | |
| .map { it to "even" } | |
| .take(1) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<Int>() | |
| .filter { it % 2 != 0 } | |
| .map { it to "odd" } | |
| .take(1) | |
| }, | |
| select { flow -> | |
| flow | |
| .filterIsInstance<String>() | |
| .map { it to "string" } | |
| .take(1) | |
| }, | |
| ) | |
| } | |
| .toList() | |
| .also { println(it) } | |
| .let { check(it == listOf(Pair(1, "odd"), Pair(2, "even"), Pair("4", "string"))) } | |
| } |
|
@copilot review PR |
|
@codex review PR AND GIVE FEEDBACKS |
Review Summary
TestingNo tests were run. |
Status
READY/IN DEVELOPMENT/HOLD
Breaking Changes
YES | NO
Description
Type of Change
Requirements