Skip to content

Commit a8a752c

Browse files
authored
fix: VersionVector recovery from metadata (#32815)
* VersionVector for replicated event sourcing wasn't fully recovered by replayed events
1 parent f65b9df commit a8a752c

File tree

6 files changed

+243
-11
lines changed

6 files changed

+243
-11
lines changed

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ import scala.concurrent.duration.DurationInt
2828

2929
import akka.actor.typed.scaladsl.ActorContext
3030
import akka.actor.typed.scaladsl.Behaviors
31+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetSeenSequenceNr
32+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetVersion
3133
import akka.persistence.typed.internal.ReplicatedEventMetadata
34+
import akka.persistence.typed.internal.VersionVector
3235
import akka.persistence.typed.scaladsl.EventWithMetadata
3336

3437
object ReplicatedEventSourcingSpec {
@@ -238,17 +241,136 @@ class ReplicatedEventSourcingSpec
238241

239242
r1 ! StoreMe("2 from r1", probe.ref)
240243
r2 ! StoreMe("2 from r2", probe.ref)
244+
r2 ! StoreMe("3 from r2", probe.ref)
241245

242246
eventually {
243247
val probe = createTestProbe[State]()
244248
r1 ! GetState(probe.ref)
245-
probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2")
249+
val expected = Set("1 from r1", "1 from r2", "2 from r1", "2 from r2", "3 from r2")
250+
probe.expectMessageType[State].all.toSet shouldEqual expected
246251
r2 ! GetState(probe.ref)
247-
probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2")
252+
probe.expectMessageType[State].all.toSet shouldEqual expected
248253
}
249254
}
250255
}
251256

257+
"recover version and seenSeqNrPerReplica" in {
258+
val entityId = nextEntityId
259+
val r1Behavior = testBehavior(entityId, "R1")
260+
val r2Behavior = testBehavior(entityId, "R2")
261+
val probe = createTestProbe[Done]()
262+
263+
def assertFirst(r1: ActorRef[Command], r2: ActorRef[Command]): Unit = {
264+
val seenSeqNrProbe = createTestProbe[Long]()
265+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
266+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
267+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
268+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // no events from R2 yet
269+
270+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
271+
seenSeqNrProbe.receiveMessage() shouldEqual 2L
272+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
273+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
274+
275+
val versionProbe = createTestProbe[VersionVector]()
276+
r1.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
277+
val v1 = versionProbe.receiveMessage()
278+
v1.versionAt("R1") shouldEqual 2L
279+
v1.versionAt("R2") shouldEqual 0L
280+
281+
r2.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
282+
val v2 = versionProbe.receiveMessage()
283+
v2.versionAt("R1") shouldEqual 2L
284+
v2.versionAt("R2") shouldEqual 0L
285+
}
286+
287+
{
288+
// first incarnation, only persist in R1
289+
val r1 = spawn(r1Behavior)
290+
val r2 = spawn(r2Behavior)
291+
r1 ! StoreMe("1 from r1", probe.ref)
292+
r1 ! StoreMe("2 from r1", probe.ref)
293+
294+
eventually {
295+
val probe = createTestProbe[State]()
296+
r1 ! GetState(probe.ref)
297+
val expected = Set("1 from r1", "2 from r1")
298+
probe.expectMessageType[State].all.toSet shouldEqual expected
299+
r2 ! GetState(probe.ref)
300+
probe.expectMessageType[State].all.toSet shouldEqual expected
301+
}
302+
303+
assertFirst(r1, r2)
304+
305+
r1 ! Stop
306+
r2 ! Stop
307+
probe.expectTerminated(r1)
308+
probe.expectTerminated(r2)
309+
}
310+
311+
def assertSecond(r1: ActorRef[Command], r2: ActorRef[Command]): Unit = {
312+
val seenSeqNrProbe = createTestProbe[Long]()
313+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
314+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
315+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
316+
seenSeqNrProbe.receiveMessage() shouldEqual 5L
317+
318+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
319+
seenSeqNrProbe.receiveMessage() shouldEqual 2L
320+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
321+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
322+
323+
val versionProbe = createTestProbe[VersionVector]()
324+
r1.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
325+
val v1 = versionProbe.receiveMessage()
326+
v1.versionAt("R1") shouldEqual 2L
327+
v1.versionAt("R2") shouldEqual 5L
328+
329+
r2.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
330+
val v2 = versionProbe.receiveMessage()
331+
v2.versionAt("R1") shouldEqual 2L
332+
v2.versionAt("R2") shouldEqual 5L
333+
}
334+
335+
{
336+
// second incarnation, only persisting in R2
337+
val r1 = spawn(r1Behavior)
338+
val r2 = spawn(r2Behavior)
339+
340+
// should recover to the same
341+
assertFirst(r1, r2)
342+
343+
r2 ! StoreMe("1 from r2", probe.ref)
344+
r2 ! StoreMe("2 from r2", probe.ref)
345+
r2 ! StoreMe("3 from r2", probe.ref)
346+
347+
eventually {
348+
val probe = createTestProbe[State]()
349+
r1 ! GetState(probe.ref)
350+
val expected = Set("1 from r1", "2 from r1", "1 from r2", "2 from r2", "3 from r2")
351+
probe.expectMessageType[State].all.toSet shouldEqual expected
352+
r2 ! GetState(probe.ref)
353+
probe.expectMessageType[State].all.toSet shouldEqual expected
354+
}
355+
356+
assertSecond(r1, r2)
357+
358+
r1 ! Stop
359+
r2 ! Stop
360+
probe.expectTerminated(r1)
361+
probe.expectTerminated(r2)
362+
}
363+
364+
{
365+
// third incarnation
366+
val r1 = spawn(r1Behavior)
367+
val r2 = spawn(r2Behavior)
368+
369+
// should recover to the same
370+
assertSecond(r1, r2)
371+
}
372+
}
373+
252374
"have access to replica information" in {
253375
val entityId = nextEntityId
254376
val probe = createTestProbe[(ReplicaId, Set[ReplicaId])]()
@@ -709,4 +831,5 @@ class ReplicatedEventSourcingSpec
709831
stateProbe.expectMessage(State(Vector("FROM R1", "from r2")))
710832
}
711833
}
834+
712835
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import akka.actor.typed.{ ActorRef, Behavior }
1515
import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin }
1616
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
1717
import akka.persistence.testkit.scaladsl.{ PersistenceTestKit, SnapshotTestKit }
18+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetSeenSequenceNr
19+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetVersion
1820
import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector }
1921
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
2022

@@ -116,5 +118,65 @@ class ReplicationSnapshotSpec
116118
stateProbe.expectMessage(State(Vector("r1 1", "r1 2")))
117119
}
118120
}
121+
122+
"recover version and seenSeqNrPerReplica" in {
123+
val entityId = nextEntityId
124+
val persistenceIdR1 = s"$EntityType|$entityId|R1"
125+
val persistenceIdR2 = s"$EntityType|$entityId|R2"
126+
val probe = createTestProbe[Done]()
127+
val r2EventProbe = createTestProbe[EventAndContext]()
128+
129+
def assertFirst(r1: ActorRef[Command], r2: ActorRef[Command]): Unit = {
130+
val seenSeqNrProbe = createTestProbe[Long]()
131+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
132+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
133+
r1.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
134+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // no events from R2 yet
135+
136+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R1"), seenSeqNrProbe.ref))
137+
seenSeqNrProbe.receiveMessage() shouldEqual 2L
138+
r2.unsafeUpcast[Any].tell(GetSeenSequenceNr(ReplicaId("R2"), seenSeqNrProbe.ref))
139+
seenSeqNrProbe.receiveMessage() shouldEqual 0L // not tracking self
140+
141+
val versionProbe = createTestProbe[VersionVector]()
142+
r1.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
143+
val v1 = versionProbe.receiveMessage()
144+
v1.versionAt("R1") shouldEqual 2L
145+
v1.versionAt("R2") shouldEqual 0L
146+
147+
r2.unsafeUpcast[Any].tell(GetVersion(versionProbe.ref))
148+
val v2 = versionProbe.receiveMessage()
149+
v2.versionAt("R1") shouldEqual 2L
150+
v2.versionAt("R2") shouldEqual 0L
151+
}
152+
153+
{
154+
val r1 = spawn(behaviorWithSnapshotting(entityId, R1))
155+
val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref))
156+
r1 ! StoreMe("r1 1", probe.ref)
157+
r1 ! StoreMe("r1 2", probe.ref)
158+
r2EventProbe.expectMessageType[EventAndContext]
159+
r2EventProbe.expectMessageType[EventAndContext]
160+
161+
snapshotTestKit.expectNextPersisted(persistenceIdR1, State(Vector("r1 1", "r1 2")))
162+
snapshotTestKit.expectNextPersisted(persistenceIdR2, State(Vector("r1 1", "r1 2")))
163+
164+
assertFirst(r1, r2)
165+
166+
r1 ! Stop
167+
r2 ! Stop
168+
probe.expectTerminated(r1)
169+
probe.expectTerminated(r2)
170+
}
171+
172+
// restart r2 from a snapshot
173+
{
174+
val r1 = spawn(behaviorWithSnapshotting(entityId, R1))
175+
val r2 = spawn(behaviorWithSnapshotting(entityId, R2))
176+
177+
// should recover to the same
178+
assertFirst(r1, r2)
179+
}
180+
}
119181
}
120182
}

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ private[akka] object EventSourcedBehaviorImpl {
8585
*/
8686
final case class GetSeenSequenceNr(replica: ReplicaId, replyTo: ActorRef[Long]) extends InternalProtocol
8787

88+
/**
89+
* For testing purposes
90+
*/
91+
final case class GetVersion(replyTo: ActorRef[VersionVector]) extends InternalProtocol
92+
8893
trait WithSeqNrAccessible {
8994
def currentSequenceNumber: Long
9095
}

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import akka.persistence.typed.SingleEventSeq
3131
import akka.persistence.typed.internal.BehaviorSetup.SnapshotWithoutRetention
3232
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetSeenSequenceNr
3333
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
34+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetVersion
3435
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.WithMetadataAccessible
3536
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.WithSeqNrAccessible
3637
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
@@ -114,6 +115,7 @@ private[akka] final class ReplayingEvents[C, E, S](
114115
case cmd: IncomingCommand[C @unchecked] => onInternalCommand(cmd)
115116
case get: GetState[S @unchecked] => stashInternal(get)
116117
case get: GetSeenSequenceNr => stashInternal(get)
118+
case get: GetVersion => stashInternal(get)
117119
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
118120
case ContinueUnstash => Behaviors.unhandled
119121
case _: AsyncEffectCompleted[_, _, _] => Behaviors.unhandled
@@ -157,6 +159,7 @@ private[akka] final class ReplayingEvents[C, E, S](
157159
version = VersionVector(replication.replicaId.id, repr.sequenceNr),
158160
concurrent = false)
159161
}
162+
160163
replication.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent)
161164
Some((meta, replication.replicaId, replication))
162165
case None => None
@@ -171,17 +174,21 @@ private[akka] final class ReplayingEvents[C, E, S](
171174
}
172175

173176
replicatedMetaAndSelfReplica match {
174-
case Some((meta, selfReplica, replication)) if meta.originReplica != selfReplica =>
177+
case Some((meta, selfReplica, replication)) =>
175178
// keep track of highest origin seqnr per other replica
179+
val updatedSeen =
180+
if (meta.originReplica == selfReplica)
181+
state.seenSeqNrPerReplica
182+
else
183+
state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr)
184+
176185
state = state.copy(
177186
state = newState,
178187
eventSeenInInterval = true,
179188
version = meta.version,
180-
seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr))
181-
replication.clearContext()
182-
case Some((_, _, replication)) =>
189+
seenSeqNrPerReplica = updatedSeen)
183190
replication.clearContext()
184-
state = state.copy(state = newState, eventSeenInInterval = true)
191+
185192
case _ =>
186193
state = state.copy(state = newState, eventSeenInInterval = true)
187194
}
@@ -305,10 +312,18 @@ private[akka] final class ReplayingEvents[C, E, S](
305312
onRecoveryComplete(setup.context)
306313
tryReturnRecoveryPermit("replay completed successfully")
307314
if (setup.internalLogger.isDebugEnabled) {
308-
setup.internalLogger.debug(
309-
"Recovery for persistenceId [{}] took {}",
310-
setup.persistenceId,
311-
(System.nanoTime() - state.recoveryStartTime).nanos.pretty)
315+
if (setup.replication.isDefined)
316+
setup.internalLogger.debug(
317+
"Recovery for persistenceId [{}] took {}, version [{}], seenSeqNrPerReplica [{}]",
318+
setup.persistenceId,
319+
(System.nanoTime() - state.recoveryStartTime).nanos.pretty,
320+
state.version,
321+
state.seenSeqNrPerReplica)
322+
else
323+
setup.internalLogger.debug(
324+
"Recovery for persistenceId [{}] took {}",
325+
setup.persistenceId,
326+
(System.nanoTime() - state.recoveryStartTime).nanos.pretty)
312327
}
313328

314329
setup.onSignal(state.state, RecoveryCompleted, catchAndLog = false)

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import akka.persistence.SnapshotProtocol.LoadSnapshotResult
1414
import akka.persistence.typed.{ RecoveryFailed, ReplicaId }
1515
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
1616
import akka.persistence.typed.SnapshotRecovered
17+
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetVersion
1718

1819
/**
1920
* INTERNAL API
@@ -72,6 +73,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
7273
onCommand(cmd)
7374
case get: GetState[S @unchecked] => stashInternal(get)
7475
case get: GetSeenSequenceNr => stashInternal(get)
76+
case get: GetVersion => stashInternal(get)
7577
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
7678
case ContinueUnstash => Behaviors.unhandled
7779
case _: AsyncEffectCompleted[_, _, _] => Behaviors.unhandled

0 commit comments

Comments
 (0)