Skip to content

Commit 228843a

Browse files
authored
fix: disable batching for vt executor (#32785)
* fix: Disable batching for virtual thread executor dispatcher * make tests work on older JDKs * actually use marker and correct invert of flag * actually working as intended, more generous timeouts
1 parent b6bd867 commit 228843a

File tree

4 files changed

+60
-1
lines changed

4 files changed

+60
-1
lines changed

akka-actor-tests/src/test/scala/akka/dispatch/VirtualThreadDispatcherSpec.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import com.typesafe.config.ConfigFactory
1414
import org.scalatest.matchers.should.Matchers
1515
import org.scalatest.wordspec.AnyWordSpec
1616

17+
import scala.concurrent.ExecutionContext
18+
import scala.concurrent.Future
19+
1720
object VirtualThreadDispatcherSpec {
1821
final case class ThreadInfo(virtual: Boolean, name: String)
1922

@@ -122,6 +125,50 @@ class VirtualThreadDispatcherSpec extends AnyWordSpec with Matchers {
122125
TestKit.shutdownActorSystem(system)
123126
}
124127
}
128+
129+
"nested works as expected" in {
130+
if (JavaVersion.majorVersion < 21) {
131+
// loom not available yet here
132+
pending
133+
} else {
134+
implicit val system: ActorSystem = ActorSystem(
135+
classOf[VirtualThreadDispatcherSpec].getSimpleName,
136+
ConfigFactory.parseString("""
137+
my-vt-dispatcher {
138+
type = "Dispatcher"
139+
executor = virtual-thread-executor
140+
virtual-thread-executor {
141+
fallback="fork-join-executor"
142+
}
143+
}
144+
""").withFallback(ConfigFactory.load()))
145+
146+
try {
147+
implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("my-vt-dispatcher")
148+
val threadInfoProbe = TestProbe()
149+
// only available on JDK 19+ so we can't use without reflection
150+
def currentThreadId(): Long =
151+
classOf[Thread].getMethod("threadId").invoke(Thread.currentThread()).asInstanceOf[Long]
152+
153+
Future {
154+
threadInfoProbe.ref ! (("parent before", currentThreadId()))
155+
Future {
156+
threadInfoProbe.ref ! (("child before", currentThreadId()))
157+
Thread.sleep(300)
158+
threadInfoProbe.ref ! (("child after", currentThreadId()))
159+
}
160+
Thread.sleep(100)
161+
threadInfoProbe.ref ! (("parent after", currentThreadId()))
162+
}
163+
val all = threadInfoProbe.receiveN(4)
164+
val items = all.collect { case (evt: String, threadId: Long) => (evt, threadId) }
165+
items.map(_._2).toSet should have size (2) // two different virtual threads
166+
items.map(_._1) shouldEqual (Seq("parent before", "child before", "parent after", "child after"))
167+
} finally {
168+
TestKit.shutdownActorSystem(system)
169+
}
170+
}
171+
}
125172
}
126173

127174
}

akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class Dispatcher(
3939

4040
import configurator.prerequisites._
4141

42+
private val batchingEnabled = !executorServiceFactoryProvider.isInstanceOf[NoBatchingExecutorFactoryProvider]
43+
4244
private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
4345
lazy val executor: ExecutorService = factory.createExecutorService
4446
def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory)
@@ -141,6 +143,10 @@ class Dispatcher(
141143
} else false
142144
}
143145

146+
override def batchable(runnable: Runnable): Boolean =
147+
if (batchingEnabled) super.batchable(runnable)
148+
else false
149+
144150
override val toString: String = Logging.simpleName(this) + "[" + id + "]"
145151
}
146152

akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ trait ExecutorServiceFactoryProvider {
6565
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory
6666
}
6767

68+
/**
69+
* Marker trait to disable dispatcher batching for a given executor
70+
*/
71+
trait NoBatchingExecutorFactoryProvider {}
72+
6873
/**
6974
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
7075
*/

akka-actor/src/main/scala/akka/dispatch/VirtualThreadConfigurator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ private[akka] object VirtualThreadConfigurator {
7171
*/
7272
@InternalApi
7373
private[akka] class VirtualThreadConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
74-
extends ExecutorServiceConfigurator(config, prerequisites) {
74+
extends ExecutorServiceConfigurator(config, prerequisites)
75+
with NoBatchingExecutorFactoryProvider {
7576
import VirtualThreadConfigurator._
7677

7778
override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {

0 commit comments

Comments
 (0)