Skip to content

Commit 096cb09

Browse files
authored
fix: cancel delayed restart when remembering entities and eagerly restarting (#32749)
1 parent 9cc3abe commit 096cb09

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,7 @@ private[akka] class Shard(
989989
log.debug("{}: Passivation started for [{}]", typeName, id)
990990
entities.entityPassivating(id)
991991
entity ! stopMessage
992+
992993
val passivationTimeout = PassivationTimedOut(entity)
993994
timers.startSingleTimer(
994995
passivationTimeout,
@@ -1144,7 +1145,12 @@ private[akka] class Shard(
11441145
def getOrCreateEntity(id: EntityId): ActorRef = {
11451146
entities.entity(id) match {
11461147
case OptionVal.Some(child) => child
1147-
case _ =>
1148+
case _ =>
1149+
// if we're remembering entities, there's a chance we have a timer to restart, which is now moot
1150+
if (rememberEntitiesStore.isDefined && entities.entityState(id) == WaitingForRestart) {
1151+
timers.cancel(RestartTerminatedEntity(id))
1152+
}
1153+
11481154
val name = URLEncoder.encode(id, "utf-8")
11491155
val a = context.actorOf(entityProps(id), name)
11501156
context.watchWith(a, EntityTerminated(a))

akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ object RememberEntitiesFailureSpec {
5454
context.stop(self)
5555
case "graceful-stop" =>
5656
context.parent ! ShardRegion.Passivate("stop")
57+
case "incarnation" =>
58+
// Don't do this at home, kids...
59+
sender() ! self
5760
case msg => sender() ! msg
5861
}
5962
}
@@ -395,6 +398,53 @@ class RememberEntitiesFailureSpec
395398
system.stop(sharding)
396399
}
397400
}
401+
402+
"neither restart entity nor crash when entity passivates after being eagerly restarted" in {
403+
val shardingSettings = ClusterShardingSettings(system).withRememberEntities(true)
404+
405+
var sharding: ActorRef = null
406+
val probe = TestProbe()
407+
implicit val sender: ActorRef = probe.ref
408+
409+
val spawnProbe = TestProbe()
410+
val props = Props[EntityActor] {
411+
spawnProbe.ref.tell("spawned", ActorRef.noSender)
412+
new EntityActor()
413+
}
414+
415+
sharding = ClusterSharding(system).start(
416+
"failStartPassivate",
417+
props,
418+
shardingSettings,
419+
extractEntityId,
420+
extractShardId,
421+
ShardAllocationStrategy.leastShardAllocationStrategy(absoluteLimit = 1, relativeLimit = 0.1),
422+
"graceful-stop")
423+
424+
sharding ! EntityEnvelope(1, "incarnation")
425+
spawnProbe.expectMsg("spawned")
426+
var currentIncarnation = probe.receiveN(1).head.asInstanceOf[ActorRef]
427+
428+
probe.watch(currentIncarnation)
429+
currentIncarnation ! "stop"
430+
probe.expectTerminated(currentIncarnation)
431+
// The restart timer is active
432+
currentIncarnation = null
433+
434+
// restart the entity early
435+
sharding ! EntityEnvelope(1, "incarnation")
436+
spawnProbe.expectMsg("spawned")
437+
sharding ! EntityEnvelope(1, "graceful-stop")
438+
currentIncarnation = probe.receiveN(1).head.asInstanceOf[ActorRef]
439+
440+
probe.watch(currentIncarnation)
441+
probe.expectTerminated(currentIncarnation)
442+
// entity is now passivated
443+
444+
probe.watch(sharding)
445+
spawnProbe.expectNoMessage(1.second)
446+
probe.expectNoMessage(1.second)
447+
}
398448
}
399449

400450
}

0 commit comments

Comments
 (0)