41
41
import java .util .concurrent .TimeUnit ;
42
42
import java .util .concurrent .TimeoutException ;
43
43
import java .util .concurrent .atomic .AtomicBoolean ;
44
+ import java .io .IOException ;
45
+ import java .net .ServerSocket ;
46
+
47
+ import org .apache .seata .core .protocol .ResultCode ;
44
48
45
49
/**
46
50
*/
47
51
public class TmNettyClientTest extends AbstractServerTest {
48
52
49
53
private static final Logger LOGGER = LoggerFactory .getLogger (TmNettyClientTest .class );
54
+ private static int dynamicPort ;
50
55
51
56
@ BeforeAll
52
- public static void init (){
57
+ public static void init () throws IOException {
53
58
// Remove hardcoded port configuration to support dynamic port allocation
54
59
// ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091");
60
+
61
+ // Get dynamic port
62
+ try (ServerSocket serverSocket = new ServerSocket (0 )) {
63
+ dynamicPort = serverSocket .getLocalPort ();
64
+ }
55
65
}
56
66
@ AfterAll
57
67
public static void after () {
@@ -60,7 +70,7 @@ public static void after() {
60
70
61
71
public static ThreadPoolExecutor initMessageExecutor () {
62
72
return new ThreadPoolExecutor (5 , 5 , 500 , TimeUnit .SECONDS ,
63
- new LinkedBlockingQueue (20000 ), new ThreadPoolExecutor .CallerRunsPolicy ());
73
+ new LinkedBlockingQueue <> (20000 ), new ThreadPoolExecutor .CallerRunsPolicy ());
64
74
}
65
75
66
76
/**
@@ -71,49 +81,37 @@ public static ThreadPoolExecutor initMessageExecutor() {
71
81
@ Test
72
82
public void testDoConnect () throws Exception {
73
83
ThreadPoolExecutor workingThreads = initMessageExecutor ();
74
- NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads );
75
- //start services server first
76
- AtomicBoolean serverStatus = new AtomicBoolean ();
77
- Thread thread = new Thread (() -> {
78
- try {
79
- nettyRemotingServer .setHandler (DefaultCoordinator .getInstance (nettyRemotingServer ));
80
- // set registry
81
- XID .setIpAddress (NetUtil .getLocalIp ());
82
- XID .setPort (8091 );
83
- // init snowflake for transactionId, branchId
84
- UUIDGenerator .init (1L );
85
- System .out .println ("pid info: " + ManagementFactory .getRuntimeMXBean ().getName ());
86
- nettyRemotingServer .init ();
87
- serverStatus .set (true );
88
- } catch (Throwable t ) {
89
- serverStatus .set (false );
90
- LOGGER .error ("The seata-server failed to start" , t );
91
- }
92
- });
93
- thread .start ();
84
+ NettyServerConfig serverConfig = new NettyServerConfig ();
85
+ serverConfig .setServerListenPort (dynamicPort );
86
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads , serverConfig );
87
+ new Thread (() -> {
88
+ SessionHolder .init (null );
89
+ nettyRemotingServer .setHandler (DefaultCoordinator .getInstance (nettyRemotingServer ));
90
+ // set registry
91
+ XID .setIpAddress (NetUtil .getLocalIp ());
92
+ XID .setPort (dynamicPort );
93
+ // init snowflake for transactionId, branchId
94
+ UUIDGenerator .init (1L );
95
+ nettyRemotingServer .init ();
96
+ }).start ();
97
+ Thread .sleep (3000 );
94
98
95
- //Wait for the seata-server to start.
96
- long start = System .nanoTime ();
97
- long maxWaitNanoTime = 10 * 1000 * 1000 * 1000L ; // 10s
98
- while (System .nanoTime () - start < maxWaitNanoTime ) {
99
- Thread .sleep (100 );
100
- if (serverStatus .get ()) {
101
- break ;
102
- }
103
- }
104
- if (!serverStatus .get ()) {
105
- throw new RuntimeException ("Waiting for a while, but the seata-server did not start successfully." );
106
- }
99
+ // Configure client to use dynamic port
100
+ ConfigurationTestHelper .putConfig ("service.default.grouplist" , "127.0.0.1:" + dynamicPort );
101
+ ConfigurationTestHelper .putConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL , String .valueOf (dynamicPort ));
107
102
108
- //then test client
109
103
String applicationId = "app 1" ;
110
- String transactionServiceGroup = "group A " ;
104
+ String transactionServiceGroup = "default_tx_group " ;
111
105
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient .getInstance (applicationId , transactionServiceGroup );
112
-
113
106
tmNettyRemotingClient .init ();
114
- String serverAddress = "0 .0.0.0:8091" ;
107
+ String serverAddress = "127 .0.0.1:" + dynamicPort ;
115
108
Channel channel = TmNettyRemotingClient .getInstance ().getClientChannelManager ().acquireChannel (serverAddress );
116
109
Assertions .assertNotNull (channel );
110
+
111
+ // Clean up configuration
112
+ ConfigurationTestHelper .removeConfig ("service.default.grouplist" );
113
+ ConfigurationTestHelper .removeConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL );
114
+
117
115
nettyRemotingServer .destroy ();
118
116
tmNettyRemotingClient .destroy ();
119
117
}
@@ -126,13 +124,15 @@ public void testDoConnect() throws Exception {
126
124
@ Test
127
125
public void testReconnect () throws Exception {
128
126
ThreadPoolExecutor workingThreads = initMessageExecutor ();
129
- NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads );
127
+ NettyServerConfig serverConfig = new NettyServerConfig ();
128
+ serverConfig .setServerListenPort (dynamicPort );
129
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads , serverConfig );
130
130
//start services server first
131
131
Thread thread = new Thread (() -> {
132
132
nettyRemotingServer .setHandler (DefaultCoordinator .getInstance (nettyRemotingServer ));
133
133
// set registry
134
134
XID .setIpAddress (NetUtil .getLocalIp ());
135
- XID .setPort (8091 );
135
+ XID .setPort (dynamicPort );
136
136
// init snowflake for transactionId, branchId
137
137
UUIDGenerator .init (1L );
138
138
nettyRemotingServer .init ();
@@ -142,52 +142,76 @@ public void testReconnect() throws Exception {
142
142
//then test client
143
143
Thread .sleep (3000 );
144
144
145
+ // Configure client to use dynamic port
146
+ ConfigurationTestHelper .putConfig ("service.default.grouplist" , "127.0.0.1:" + dynamicPort );
147
+ ConfigurationTestHelper .putConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL , String .valueOf (dynamicPort ));
148
+
145
149
String applicationId = "app 1" ;
146
150
String transactionServiceGroup = "default_tx_group" ;
147
151
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient .getInstance (applicationId , transactionServiceGroup );
148
152
149
153
tmNettyRemotingClient .init ();
150
154
151
155
TmNettyRemotingClient .getInstance ().getClientChannelManager ().reconnect (transactionServiceGroup );
156
+
157
+ // Clean up configuration
158
+ ConfigurationTestHelper .removeConfig ("service.default.grouplist" );
159
+ ConfigurationTestHelper .removeConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL );
160
+
152
161
nettyRemotingServer .destroy ();
153
162
tmNettyRemotingClient .destroy ();
154
163
}
155
164
156
165
@ Test
157
166
public void testSendMsgWithResponse () throws Exception {
158
167
ThreadPoolExecutor workingThreads = initMessageExecutor ();
159
- NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads );
168
+ NettyServerConfig serverConfig = new NettyServerConfig ();
169
+ serverConfig .setServerListenPort (dynamicPort );
170
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer (workingThreads , serverConfig );
160
171
new Thread (() -> {
161
172
SessionHolder .init (null );
162
173
nettyRemotingServer .setHandler (DefaultCoordinator .getInstance (nettyRemotingServer ));
163
174
// set registry
164
175
XID .setIpAddress (NetUtil .getLocalIp ());
165
- XID .setPort (8091 );
176
+ XID .setPort (dynamicPort );
166
177
// init snowflake for transactionId, branchId
167
178
UUIDGenerator .init (1L );
168
179
nettyRemotingServer .init ();
169
180
}).start ();
170
181
Thread .sleep (3000 );
171
182
183
+ // Configure client to use dynamic port
184
+ ConfigurationTestHelper .putConfig ("service.default.grouplist" , "127.0.0.1:" + dynamicPort );
185
+ ConfigurationTestHelper .putConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL , String .valueOf (dynamicPort ));
186
+
172
187
String applicationId = "app 1" ;
173
188
String transactionServiceGroup = "default_tx_group" ;
174
189
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient .getInstance (applicationId , transactionServiceGroup );
175
190
tmNettyRemotingClient .init ();
176
191
177
- String serverAddress = "0 .0.0.0:8091" ;
192
+ String serverAddress = "127 .0.0.1:" + dynamicPort ;
178
193
Channel channel = TmNettyRemotingClient .getInstance ().getClientChannelManager ().acquireChannel (serverAddress );
179
194
Assertions .assertNotNull (channel );
180
195
GlobalCommitRequest request = new GlobalCommitRequest ();
181
- request .setXid ("127.0.0.1:8091 :1249853" );
196
+ request .setXid ("127.0.0.1:" + dynamicPort + " :1249853" );
182
197
GlobalCommitResponse globalCommitResponse = null ;
183
198
try {
184
- globalCommitResponse = (GlobalCommitResponse )tmNettyRemotingClient .sendSyncRequest (request );
199
+ globalCommitResponse = (GlobalCommitResponse ) tmNettyRemotingClient .sendSyncRequest (request );
185
200
} catch (TimeoutException e ) {
186
- throw new RuntimeException ( e );
201
+ e . printStackTrace ( );
187
202
}
188
203
Assertions .assertNotNull (globalCommitResponse );
189
- Assertions .assertEquals (GlobalStatus .Finished , globalCommitResponse .getGlobalStatus ());
204
+ Assertions .assertEquals (ResultCode .Failed , globalCommitResponse .getResultCode ());
205
+ Assertions .assertEquals ("TransactionException[Could not found global transaction xid = 127.0.0.1:" + dynamicPort + ":1249853, may be has finished.]" ,
206
+ globalCommitResponse .getMsg ());
207
+
208
+ // Clean up configuration
209
+ ConfigurationTestHelper .removeConfig ("service.default.grouplist" );
210
+ ConfigurationTestHelper .removeConfig (ConfigurationKeys .SERVER_SERVICE_PORT_CAMEL );
211
+
190
212
nettyRemotingServer .destroy ();
191
213
tmNettyRemotingClient .destroy ();
192
214
}
215
+
216
+
193
217
}
0 commit comments