Skip to content

Commit f389c32

Browse files
authored
Fix Signal#ap bug in getAndDiscreteUpdates (#3206)
* Add test for hanging `getAndDiscreteUpdates` * Fix `Signal#ap` bug in `getAndDiscreteUpdates`
1 parent 9de31c2 commit f389c32

File tree

2 files changed

+26
-23
lines changed

2 files changed

+26
-23
lines changed

core/shared/src/main/scala/fs2/concurrent/Signal.scala

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
package fs2
2323
package concurrent
2424

25-
import cats.data.OptionT
2625
import cats.kernel.Eq
2726
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
2827
import cats.effect.std.MapRef
@@ -624,27 +623,18 @@ object SignallingMapRef {
624623

625624
private[concurrent] trait SignalInstances extends SignalLowPriorityInstances {
626625
implicit def applicativeInstance[F[_]: Concurrent]: Applicative[Signal[F, *]] = {
627-
def nondeterministicZip[A0, A1](xs: Stream[F, A0], ys: Stream[F, A1]): Stream[F, (A0, A1)] = {
628-
type PullOutput = (A0, A1, Stream[F, A0], Stream[F, A1])
629-
630-
val firstPull: OptionT[Pull[F, PullOutput, *], Unit] = for {
631-
firstXAndRestOfXs <- OptionT(xs.pull.uncons1.covaryOutput[PullOutput])
632-
(x, restOfXs) = firstXAndRestOfXs
633-
firstYAndRestOfYs <- OptionT(ys.pull.uncons1.covaryOutput[PullOutput])
634-
(y, restOfYs) = firstYAndRestOfYs
635-
_ <- OptionT.liftF {
636-
Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs)): Pull[F, PullOutput, Unit]
626+
def nondeterministicZip[A0, A1](
627+
x0: A0,
628+
xs: Stream[F, A0],
629+
y0: A1,
630+
ys: Stream[F, A1]
631+
): Stream[F, (A0, A1)] =
632+
xs.either(ys)
633+
.scan((x0, y0)) {
634+
case ((_, rightElem), Left(newElem)) => (newElem, rightElem)
635+
case ((leftElem, _), Right(newElem)) => (leftElem, newElem)
637636
}
638-
} yield ()
639-
640-
firstPull.value.void.stream
641-
.flatMap { case (x, y, restOfXs, restOfYs) =>
642-
restOfXs.either(restOfYs).scan((x, y)) {
643-
case ((_, rightElem), Left(newElem)) => (newElem, rightElem)
644-
case ((leftElem, _), Right(newElem)) => (leftElem, newElem)
645-
}
646-
}
647-
}
637+
.drop(1)
648638

649639
new Applicative[Signal[F, *]] {
650640
override def map[A, B](fa: Signal[F, A])(f: A => B): Signal[F, B] = Signal.mapped(fa)(f)
@@ -654,7 +644,9 @@ private[concurrent] trait SignalInstances extends SignalLowPriorityInstances {
654644
def ap[A, B](ff: Signal[F, A => B])(fa: Signal[F, A]): Signal[F, B] =
655645
new Signal[F, B] {
656646
def discrete: Stream[F, B] =
657-
nondeterministicZip(ff.discrete, fa.discrete).map { case (f, a) => f(a) }
647+
Stream.resource(getAndDiscreteUpdates).flatMap { case (a, updates) =>
648+
Stream.emit(a) ++ updates
649+
}
658650

659651
def continuous: Stream[F, B] = Stream.repeatEval(get)
660652

@@ -666,7 +658,7 @@ private[concurrent] trait SignalInstances extends SignalLowPriorityInstances {
666658

667659
private[this] def getAndDiscreteUpdatesImpl =
668660
(ff.getAndDiscreteUpdates, fa.getAndDiscreteUpdates).mapN { case ((f, fs), (a, as)) =>
669-
(f(a), nondeterministicZip(fs, as).map { case (f, a) => f(a) })
661+
(f(a), nondeterministicZip(f, fs, a, as).map { case (f, a) => f(a) })
670662
}
671663
}
672664
}

core/shared/src/test/scala/fs2/concurrent/SignalSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,17 @@ class SignalSuite extends Fs2Suite {
283283
}
284284
}
285285

286+
test("ap getAndDiscreteUpdates propagates changes from either signal") {
287+
TestControl.executeEmbed {
288+
(SignallingRef[IO].of((i: Int) => i + 1), SignallingRef[IO].of(0)).flatMapN {
289+
case (ffs, fus) =>
290+
(ffs: Signal[IO, Int => Int]).ap(fus).getAndDiscreteUpdates.use { case (_, updates) =>
291+
fus.set(1) *> updates.head.compile.lastOrError.assertEquals(2) // should not hang
292+
}
293+
}
294+
}
295+
}
296+
286297
test("waitUntil") {
287298
val target = 5
288299
val expected = 1

0 commit comments

Comments
 (0)