Skip to content

Commit 3c0a417

Browse files
authored
test: Shard region termination, #32771 (#32792)
1 parent 903b139 commit 3c0a417

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.cluster.sharding
6+
7+
import scala.concurrent.duration._
8+
9+
import akka.actor.ActorIdentity
10+
import akka.actor.Identify
11+
import akka.remote.testconductor.RoleName
12+
import akka.remote.testkit.Direction
13+
import akka.testkit._
14+
15+
abstract class ClusterShardingRegionTerminationSpecConfig(mode: String)
16+
extends MultiNodeClusterShardingConfig(
17+
mode,
18+
additionalConfig = """
19+
akka.cluster.downing-provider-class = ""
20+
akka.cluster.failure-detector.acceptable-heartbeat-pause = 1s
21+
akka.persistence.journal.leveldb-shared.store.native = off
22+
# we have 5 nodes, and one becomes unreachable
23+
akka.cluster.sharding.coordinator-state {
24+
write-majority-plus = 0
25+
read-majority-plus = 0
26+
}
27+
akka.cluster.sharding.distributed-data.majority-min-cap = 4
28+
""") {
29+
30+
val first = role("first")
31+
val second = role("second")
32+
val third = role("third")
33+
val fourth = role("fourth")
34+
val fifth = role("fifth")
35+
36+
testTransport(on = true)
37+
38+
}
39+
40+
object PersistentClusterShardingRegionTerminationSpecConfig
41+
extends ClusterShardingRegionTerminationSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
42+
object DDataClusterShardingRegionTerminationSpecConfig
43+
extends ClusterShardingRegionTerminationSpecConfig(ClusterShardingSettings.StateStoreModeDData)
44+
45+
class PersistentClusterShardingRegionTerminationSpec
46+
extends ClusterShardingRegionTerminationSpec(PersistentClusterShardingRegionTerminationSpecConfig)
47+
class DDataClusterShardingRegionTerminationSpec
48+
extends ClusterShardingRegionTerminationSpec(DDataClusterShardingRegionTerminationSpecConfig)
49+
50+
class PersistentClusterShardingRegionTerminationMultiJvmNode1 extends PersistentClusterShardingRegionTerminationSpec
51+
class PersistentClusterShardingRegionTerminationMultiJvmNode2 extends PersistentClusterShardingRegionTerminationSpec
52+
class PersistentClusterShardingRegionTerminationMultiJvmNode3 extends PersistentClusterShardingRegionTerminationSpec
53+
class PersistentClusterShardingRegionTerminationMultiJvmNode4 extends PersistentClusterShardingRegionTerminationSpec
54+
class PersistentClusterShardingRegionTerminationMultiJvmNode5 extends PersistentClusterShardingRegionTerminationSpec
55+
56+
class DDataClusterShardingRegionTerminationMultiJvmNode1 extends DDataClusterShardingRegionTerminationSpec
57+
class DDataClusterShardingRegionTerminationMultiJvmNode2 extends DDataClusterShardingRegionTerminationSpec
58+
class DDataClusterShardingRegionTerminationMultiJvmNode3 extends DDataClusterShardingRegionTerminationSpec
59+
class DDataClusterShardingRegionTerminationMultiJvmNode4 extends DDataClusterShardingRegionTerminationSpec
60+
class DDataClusterShardingRegionTerminationMultiJvmNode5 extends DDataClusterShardingRegionTerminationSpec
61+
62+
// This is a reproducer, and future test coverage, of #32756
63+
abstract class ClusterShardingRegionTerminationSpec(multiNodeConfig: ClusterShardingRegionTerminationSpecConfig)
64+
extends MultiNodeClusterShardingSpec(multiNodeConfig)
65+
with ImplicitSender {
66+
67+
import multiNodeConfig._
68+
69+
val extractEntityId: ShardRegion.ExtractEntityId = {
70+
case msg: Int => (msg.toString, msg)
71+
case _ => throw new IllegalArgumentException()
72+
}
73+
74+
// each shardId same as entityId
75+
val extractShardId: ShardRegion.ExtractShardId = {
76+
case msg: Int => msg.toString
77+
case ShardRegion.StartEntity(id) => id
78+
case _ => throw new IllegalArgumentException()
79+
}
80+
81+
def join(from: RoleName, to: RoleName): Unit = {
82+
join(
83+
from,
84+
to,
85+
startSharding(
86+
system,
87+
typeName = "Entity",
88+
entityProps = TestActors.echoActorProps,
89+
extractEntityId = extractEntityId,
90+
extractShardId = extractShardId))
91+
}
92+
93+
lazy val region = ClusterSharding(system).shardRegion("Entity")
94+
95+
s"Cluster sharding (${multiNodeConfig.mode})" must {
96+
97+
"deallocate from terminated region" in within(30.seconds) {
98+
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second))
99+
100+
join(first, first)
101+
102+
runOn(first) {
103+
region ! 1
104+
expectMsg(1)
105+
lastSender.path should be(region.path / "1" / "1")
106+
}
107+
enterBarrier("first-started")
108+
109+
join(second, first)
110+
runOn(second) {
111+
region ! 2
112+
expectMsg(2)
113+
lastSender.path should be(region.path / "2" / "2")
114+
region ! 22
115+
expectMsg(22)
116+
}
117+
enterBarrier("second-started")
118+
119+
join(third, first)
120+
join(fourth, first)
121+
join(fifth, first)
122+
runOn(third) {
123+
region ! 3
124+
expectMsg(3)
125+
}
126+
runOn(fourth) {
127+
region ! 4
128+
expectMsg(4)
129+
}
130+
runOn(fifth) {
131+
region ! 5
132+
expectMsg(5)
133+
}
134+
enterBarrier("more-started")
135+
136+
// need the ref from other node
137+
val secondRegion =
138+
if (myself == second) {
139+
region
140+
} else {
141+
system.actorSelection(node(second) / "system" / "sharding" / "Entity") ! Identify(None)
142+
expectMsgType[ActorIdentity].ref.get
143+
}
144+
enterBarrier("lookup-second-region")
145+
146+
// some shard allocations in flight
147+
runOn(third) {
148+
(6 to 20).foreach { n =>
149+
region ! n
150+
}
151+
}
152+
enterBarrier("requests-in-flight")
153+
154+
runOn(first) {
155+
testConductor.blackhole(first, second, Direction.Both).await
156+
testConductor.blackhole(third, second, Direction.Both).await
157+
testConductor.blackhole(fourth, second, Direction.Both).await
158+
testConductor.blackhole(fifth, second, Direction.Both).await
159+
}
160+
enterBarrier("blackhole-second")
161+
162+
// some more shard allocations in flight
163+
runOn(fourth, fifth) {
164+
(21 to 40).foreach { n =>
165+
region ! n
166+
}
167+
}
168+
169+
runOn(first) {
170+
val secondAddress = node(second).address
171+
awaitAssert {
172+
cluster.state.unreachable.exists(m => m.address == secondAddress) shouldBe true
173+
}
174+
cluster.down(secondAddress)
175+
176+
awaitAssert {
177+
val p = TestProbe()
178+
// shard 2 was already allocated at the beginning to second region
179+
// and should now be allocated elsewhere
180+
region.tell(2, p.ref)
181+
p.expectMsg(1.second, 2)
182+
lastSender.path should not be (secondRegion.path / "2" / "2")
183+
}
184+
}
185+
186+
enterBarrier("second-deallocated")
187+
188+
// verify all previous shards, and some new
189+
runOn(first, third, fourth, fifth) {
190+
val p = TestProbe()
191+
(1 to 50).foreach { n =>
192+
region.tell(n, p.ref)
193+
p.expectMsg(n)
194+
p.lastSender.path should not be (secondRegion.path / "2" / "2")
195+
}
196+
}
197+
198+
enterBarrier("after")
199+
}
200+
201+
}
202+
}

0 commit comments

Comments
 (0)