Skip to content

Commit 98e667a

Browse files
authored
Revert "perf: prioritize handling of Terminated after updating ddata" (#32766)
This reverts commit a9916f3.
1 parent a90c165 commit 98e667a

File tree

1 file changed

+8
-42
lines changed

1 file changed

+8
-42
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,15 +1755,14 @@ private[akka] class DDataShardCoordinator(
17551755
shardId: Option[ShardId],
17561756
waitingForStateWrite: Boolean,
17571757
waitingForRememberShard: Boolean,
1758-
afterUpdateCallback: E => Unit,
1759-
terminatingShardRegion: Option[Terminated]): Receive = {
1758+
afterUpdateCallback: E => Unit): Receive = {
17601759

17611760
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) =>
17621761
updateStateRetries = 0
17631762
if (!waitingForRememberShard) {
17641763
log.debug("{}: The coordinator state was successfully updated with {}", typeName, evt)
17651764
if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
1766-
unbecomeAfterUpdate(evt, afterUpdateCallback, terminatingShardRegion)
1765+
unbecomeAfterUpdate(evt, afterUpdateCallback)
17671766
} else {
17681767
log.debug(
17691768
"{}: The coordinator state was successfully updated with {}, waiting for remember shard update",
@@ -1775,8 +1774,7 @@ private[akka] class DDataShardCoordinator(
17751774
shardId,
17761775
waitingForStateWrite = false,
17771776
waitingForRememberShard = true,
1778-
afterUpdateCallback = afterUpdateCallback,
1779-
terminatingShardRegion = terminatingShardRegion))
1777+
afterUpdateCallback = afterUpdateCallback))
17801778
}
17811779

17821780
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) =>
@@ -1823,25 +1821,6 @@ private[akka] class DDataShardCoordinator(
18231821
if (!handleGetShardHome(shard))
18241822
stashGetShardHomeRequest(sender(), g) // must wait for update that is in progress
18251823

1826-
case t @ Terminated(ref) if terminatingShardRegion.isEmpty && state.regions.get(ref).fold(0)(_.size) > 0 =>
1827-
// ensure that if this termination results in any state updates,
1828-
// those updates will be the next state update
1829-
// the effect is to bulk-deallocate shards associated with this region
1830-
// if there was a rebalance before this (graceful leaving) with pending
1831-
// RebalanceDone (which also deallocates shards), the handling for that
1832-
// accounts for processing Terminated before RebalanceDone
1833-
//
1834-
// Subsequent Terminateds while waiting for update will be handled like
1835-
// any other message (viz. stashed)
1836-
context.become(
1837-
waitingForUpdate(
1838-
evt,
1839-
shardId,
1840-
waitingForStateWrite = waitingForStateWrite,
1841-
waitingForRememberShard = waitingForRememberShard,
1842-
afterUpdateCallback = afterUpdateCallback,
1843-
terminatingShardRegion = Some(t)))
1844-
18451824
case ShardCoordinator.Internal.Terminate =>
18461825
log.debug("{}: The ShardCoordinator received termination message while waiting for update", typeName)
18471826
terminating = true
@@ -1858,7 +1837,7 @@ private[akka] class DDataShardCoordinator(
18581837
if (!waitingForStateWrite) {
18591838
log.debug("{}: The ShardCoordinator saw remember shard start successfully written {}", typeName, evt)
18601839
if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
1861-
unbecomeAfterUpdate(evt, afterUpdateCallback, terminatingShardRegion)
1840+
unbecomeAfterUpdate(evt, afterUpdateCallback)
18621841
} else {
18631842
log.debug(
18641843
"{}: The ShardCoordinator saw remember shard start successfully written {}, waiting for state update",
@@ -1870,8 +1849,7 @@ private[akka] class DDataShardCoordinator(
18701849
shardId,
18711850
waitingForStateWrite = true,
18721851
waitingForRememberShard = false,
1873-
afterUpdateCallback = afterUpdateCallback,
1874-
terminatingShardRegion = terminatingShardRegion))
1852+
afterUpdateCallback = afterUpdateCallback))
18751853
}
18761854
}
18771855

@@ -1907,18 +1885,8 @@ private[akka] class DDataShardCoordinator(
19071885
case _ => stash()
19081886
}
19091887

1910-
private def unbecomeAfterUpdate[E <: DomainEvent](
1911-
evt: E,
1912-
afterUpdateCallback: E => Unit,
1913-
unprocessedTermination: Option[Terminated]): Unit = {
1888+
private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E => Unit): Unit = {
19141889
context.unbecome()
1915-
1916-
// want an unprocessed termination to be in the mailbox before
1917-
// anything else could be unstashed in the callback
1918-
unprocessedTermination.foreach { t =>
1919-
self.tell(t, ActorRef.noSender)
1920-
}
1921-
19221890
afterUpdateCallback(evt)
19231891
if (verboseDebug)
19241892
log.debug("{}: New coordinator state after [{}]: [{}]", typeName, evt, state)
@@ -1991,8 +1959,7 @@ private[akka] class DDataShardCoordinator(
19911959
shardId = Some(s.shard),
19921960
waitingForStateWrite = true,
19931961
waitingForRememberShard = true,
1994-
afterUpdateCallback = f,
1995-
terminatingShardRegion = None)
1962+
afterUpdateCallback = f)
19961963

19971964
case _ =>
19981965
// no update of shards, already known
@@ -2001,8 +1968,7 @@ private[akka] class DDataShardCoordinator(
20011968
shardId = None,
20021969
waitingForStateWrite = true,
20031970
waitingForRememberShard = false,
2004-
afterUpdateCallback = f,
2005-
terminatingShardRegion = None)
1971+
afterUpdateCallback = f)
20061972
}
20071973
context.become(waitingReceive, discardOld = false)
20081974
}

0 commit comments

Comments
 (0)