16
16
use Workerman \Worker ;
17
17
use Swoole \Event ;
18
18
use Swoole \Timer ;
19
+ use Swoole \Coroutine ;
19
20
20
21
class Swoole implements EventInterface
21
22
{
@@ -33,6 +34,10 @@ class Swoole implements EventInterface
33
34
34
35
protected $ _hasSignal = false ;
35
36
37
+ protected $ _readEvents = array ();
38
+
39
+ protected $ _writeEvents = array ();
40
+
36
41
/**
37
42
*
38
43
* {@inheritdoc}
@@ -61,7 +66,7 @@ function () {
61
66
$ mapId = $ this ->mapId ++;
62
67
$ t = (int )($ fd * 1000 );
63
68
if ($ t < 1 ) {
64
- $ t = 1 ;
69
+ $ t = 1 ;
65
70
}
66
71
$ timer_id = Timer::$ method ($ t ,
67
72
function ($ timer_id = null ) use ($ func , $ args , $ mapId ) {
@@ -92,9 +97,14 @@ function ($timer_id = null) use ($func, $args, $mapId) {
92
97
case self ::EV_READ :
93
98
case self ::EV_WRITE :
94
99
$ fd_key = (int ) $ fd ;
95
- if (! isset ($ this ->_fd [$ fd_key ])) {
100
+ if ($ flag === self ::EV_READ ) {
101
+ $ this ->_readEvents [$ fd_key ] = $ func ;
102
+ } else {
103
+ $ this ->_writeEvents [$ fd_key ] = $ func ;
104
+ }
105
+ if (!isset ($ this ->_fd [$ fd_key ])) {
96
106
if ($ flag === self ::EV_READ ) {
97
- $ res = Event::add ($ fd , $ func , null , SWOOLE_EVENT_READ );
107
+ $ res = Event::add ($ fd , [ $ this , ' callRead ' ] , null , SWOOLE_EVENT_READ );
98
108
$ fd_type = SWOOLE_EVENT_READ ;
99
109
} else {
100
110
$ res = Event::add ($ fd , null , $ func , SWOOLE_EVENT_WRITE );
@@ -124,6 +134,42 @@ function ($timer_id = null) use ($func, $args, $mapId) {
124
134
}
125
135
}
126
136
137
+ /**
138
+ * @param $fd
139
+ * @return void
140
+ */
141
+ protected function callRead ($ stream )
142
+ {
143
+ $ fd = (int ) $ stream ;
144
+ if (isset ($ this ->_readEvents [$ fd ])) {
145
+ try {
146
+ \call_user_func ($ this ->_readEvents [$ fd ], $ stream );
147
+ } catch (\Exception $ e ) {
148
+ Worker::stopAll (250 , $ e );
149
+ } catch (\Error $ e ) {
150
+ Worker::stopAll (250 , $ e );
151
+ }
152
+ }
153
+ }
154
+
155
+ /**
156
+ * @param $fd
157
+ * @return void
158
+ */
159
+ protected function callWrite ($ stream )
160
+ {
161
+ $ fd = (int ) $ stream ;
162
+ if (isset ($ this ->_writeEvents [$ fd ])) {
163
+ try {
164
+ \call_user_func ($ this ->_writeEvents [$ fd ], $ stream );
165
+ } catch (\Exception $ e ) {
166
+ Worker::stopAll (250 , $ e );
167
+ } catch (\Error $ e ) {
168
+ Worker::stopAll (250 , $ e );
169
+ }
170
+ }
171
+ }
172
+
127
173
/**
128
174
*
129
175
* {@inheritdoc}
@@ -153,6 +199,11 @@ public function del($fd, $flag)
153
199
case self ::EV_READ :
154
200
case self ::EV_WRITE :
155
201
$ fd_key = (int ) $ fd ;
202
+ if ($ flag === self ::EV_READ ) {
203
+ unset($ this ->_readEvents [$ fd_key ]);
204
+ } elseif ($ flag === self ::EV_WRITE ) {
205
+ unset($ this ->_writeEvents [$ fd_key ]);
206
+ }
156
207
if (isset ($ this ->_fd [$ fd_key ])) {
157
208
$ fd_val = $ this ->_fd [$ fd_key ];
158
209
if ($ flag === self ::EV_READ ) {
@@ -213,8 +264,10 @@ public function loop()
213
264
*/
214
265
public function destroy ()
215
266
{
267
+ foreach (Coroutine::listCoroutines () as $ coroutine ) {
268
+ Coroutine::cancel ($ coroutine );
269
+ }
216
270
Event::exit ();
217
- posix_kill (posix_getpid (), SIGINT );
218
271
}
219
272
220
273
/**
0 commit comments