File tree Expand file tree Collapse file tree
main/scala/fs2/concurrent
test/scala/fs2/concurrent Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -164,8 +164,21 @@ object Topic {
164164 case State .Closed () =>
165165 Topic .closed.pure[F ]
166166 case State .Active (subs, _) =>
167- foreach(subs)(_.send(a).void)
168- .as(Topic .rightUnit)
167+ subs.foldLeft(F .pure(Topic .rightUnit)) { case (acc, (_, chan)) =>
168+ acc.flatMap {
169+ case Left (Topic .Closed ) => Topic .closed.pure[F ]
170+ case Right (_) =>
171+ chan.send(a).flatMap {
172+ case Right (_) => Topic .rightUnit.pure[F ]
173+ case Left (_) =>
174+ // Channel send failed, check if topic was closed
175+ state.get.map {
176+ case State .Closed () => Topic .closed
177+ case State .Active (_, _) => Topic .rightUnit
178+ }
179+ }
180+ }
181+ }
169182 }
170183
171184 def subscribeAwait (maxQueued : Int ): Resource [F , Stream [F , A ]] =
Original file line number Diff line number Diff line change @@ -218,7 +218,7 @@ class TopicSuite extends Fs2Suite {
218218
219219 // https://github.com/typelevel/fs2/issues/3644
220220 test(
221- " when publish1 returns success, subscribers must receive the event, even if the publish1 races with close" .fail
221+ " when publish1 returns success, subscribers must receive the event, even if the publish1 races with close"
222222 ) {
223223 val check : IO [Unit ] =
224224 Topic [IO , String ]
You can’t perform that action at this time.
0 commit comments