22
33import io .netty .channel .ChannelFactory ;
44import io .netty .channel .EventLoopGroup ;
5+ import io .netty .channel .IoHandlerFactory ;
6+ import io .netty .channel .MultiThreadIoEventLoopGroup ;
57import io .netty .channel .epoll .Epoll ;
68import io .netty .channel .epoll .EpollDatagramChannel ;
79import io .netty .channel .epoll .EpollEventLoopGroup ;
10+ import io .netty .channel .epoll .EpollIoHandler ;
811import io .netty .channel .epoll .EpollServerSocketChannel ;
912import io .netty .channel .epoll .EpollSocketChannel ;
1013import io .netty .channel .kqueue .KQueue ;
1114import io .netty .channel .kqueue .KQueueDatagramChannel ;
1215import io .netty .channel .kqueue .KQueueEventLoopGroup ;
16+ import io .netty .channel .kqueue .KQueueIoHandler ;
1317import io .netty .channel .kqueue .KQueueServerSocketChannel ;
1418import io .netty .channel .kqueue .KQueueSocketChannel ;
1519import io .netty .channel .nio .NioEventLoopGroup ;
20+ import io .netty .channel .nio .NioIoHandler ;
1621import io .netty .channel .socket .DatagramChannel ;
1722import io .netty .channel .socket .ServerSocketChannel ;
1823import io .netty .channel .socket .SocketChannel ;
1924import io .netty .channel .socket .nio .NioDatagramChannel ;
2025import io .netty .channel .socket .nio .NioServerSocketChannel ;
2126import io .netty .channel .socket .nio .NioSocketChannel ;
22- import io .netty .incubator .channel .uring .IOUring ;
23- import io .netty .incubator .channel .uring .IOUringDatagramChannel ;
24- import io .netty .incubator .channel .uring .IOUringEventLoopGroup ;
25- import io .netty .incubator .channel .uring .IOUringServerSocketChannel ;
26- import io .netty .incubator .channel .uring .IOUringSocketChannel ;
27+ import io .netty .channel .uring .IoUring ;
28+ import io .netty .channel .uring .IoUringDatagramChannel ;
29+ import io .netty .channel .uring .IoUringIoHandler ;
30+ import io .netty .channel .uring .IoUringServerSocketChannel ;
31+ import io .netty .channel .uring .IoUringSocketChannel ;
32+ import org .checkerframework .checker .nullness .qual .Nullable ;
2733
2834import java .util .concurrent .ThreadFactory ;
29- import java .util .function .Function ;
35+ import java .util .function .BiFunction ;
3036
3137public class TransportHelper {
3238 public static final TransportHelper .TransportType TRANSPORT_TYPE = TransportHelper .determineTransportMethod ();
39+ public static final boolean NEW_NETTY = isClassAvailable ("io.netty.channel.local.LocalIoHandle" );
3340
3441 public enum TransportMethod {
3542 NIO , EPOLL , KQUEUE , IO_URING
@@ -42,24 +49,27 @@ public record TransportType(TransportMethod method,
4249 ChannelFactory <? extends SocketChannel > socketChannelFactory ,
4350 Class <? extends DatagramChannel > datagramChannelClass ,
4451 ChannelFactory <? extends DatagramChannel > datagramChannelFactory ,
45- Function < ThreadFactory , EventLoopGroup > eventLoopGroupFactory ,
52+ @ Nullable BiFunction < Integer , ThreadFactory , EventLoopGroup > eventLoopGroupFactory ,
4653 boolean supportsTcpFastOpenServer ,
4754 boolean supportsTcpFastOpenClient ) {
4855 }
4956
5057 private static TransportType determineTransportMethod () {
51- if (isClassAvailable ("io.netty.incubator.channel.uring.IOUring" ) && IOUring .isAvailable ()) {
58+ if (isClassAvailable ("io.netty.channel.uring.IoUring" )
59+ && IoUring .isAvailable ()
60+ && Boolean .parseBoolean (System .getProperty ("Mcpl.io_uring" ))
61+ ) {
5262 return new TransportType (
5363 TransportMethod .IO_URING ,
54- IOUringServerSocketChannel .class ,
55- IOUringServerSocketChannel ::new ,
56- IOUringSocketChannel .class ,
57- IOUringSocketChannel ::new ,
58- IOUringDatagramChannel .class ,
59- IOUringDatagramChannel ::new ,
60- factory -> new IOUringEventLoopGroup ( 0 , factory ),
61- IOUring .isTcpFastOpenServerSideAvailable (),
62- IOUring .isTcpFastOpenClientSideAvailable ()
64+ IoUringServerSocketChannel .class ,
65+ IoUringServerSocketChannel ::new ,
66+ IoUringSocketChannel .class ,
67+ IoUringSocketChannel ::new ,
68+ IoUringDatagramChannel .class ,
69+ IoUringDatagramChannel ::new ,
70+ ( threads , factory ) -> null ,
71+ IoUring .isTcpFastOpenServerSideAvailable (),
72+ IoUring .isTcpFastOpenClientSideAvailable ()
6373 );
6474 }
6575
@@ -72,7 +82,7 @@ private static TransportType determineTransportMethod() {
7282 EpollSocketChannel ::new ,
7383 EpollDatagramChannel .class ,
7484 EpollDatagramChannel ::new ,
75- factory -> new EpollEventLoopGroup ( 0 , factory ) ,
85+ NEW_NETTY ? null : EpollEventLoopGroup :: new ,
7686 Epoll .isTcpFastOpenServerSideAvailable (),
7787 Epoll .isTcpFastOpenClientSideAvailable ()
7888 );
@@ -87,7 +97,7 @@ private static TransportType determineTransportMethod() {
8797 KQueueSocketChannel ::new ,
8898 KQueueDatagramChannel .class ,
8999 KQueueDatagramChannel ::new ,
90- factory -> new KQueueEventLoopGroup ( 0 , factory ) ,
100+ NEW_NETTY ? null : KQueueEventLoopGroup :: new ,
91101 KQueue .isTcpFastOpenServerSideAvailable (),
92102 KQueue .isTcpFastOpenClientSideAvailable ()
93103 );
@@ -101,12 +111,30 @@ private static TransportType determineTransportMethod() {
101111 NioSocketChannel ::new ,
102112 NioDatagramChannel .class ,
103113 NioDatagramChannel ::new ,
104- factory -> new NioEventLoopGroup ( 0 , factory ) ,
114+ NEW_NETTY ? null : NioEventLoopGroup :: new ,
105115 false ,
106116 false
107117 );
108118 }
109119
120+ public static EventLoopGroup createEventLoopGroup (ThreadFactory threadFactory ) {
121+ return createEventLoopGroup (threadFactory , 0 );
122+ }
123+
124+ public static EventLoopGroup createEventLoopGroup (ThreadFactory threadFactory , int threads ) {
125+ if (TRANSPORT_TYPE .eventLoopGroupFactory () == null ) {
126+ IoHandlerFactory handler = switch (TRANSPORT_TYPE .method ()) {
127+ case NIO -> NioIoHandler .newFactory ();
128+ case EPOLL -> EpollIoHandler .newFactory ();
129+ case KQUEUE -> KQueueIoHandler .newFactory ();
130+ case IO_URING -> IoUringIoHandler .newFactory ();
131+ };
132+ return new MultiThreadIoEventLoopGroup (threads , threadFactory , handler );
133+ } else {
134+ return TRANSPORT_TYPE .eventLoopGroupFactory ().apply (threads , threadFactory );
135+ }
136+ }
137+
110138 /**
111139 * Used so implementations can opt to remove these dependencies if so desired
112140 */
0 commit comments