diff --git a/src/StreamSelectLoop.php b/src/StreamSelectLoop.php index 7d455048..cbe841f6 100644 --- a/src/StreamSelectLoop.php +++ b/src/StreamSelectLoop.php @@ -218,7 +218,12 @@ private function waitForStreamActivity($timeout) $read = $this->readStreams; $write = $this->writeStreams; - $this->streamSelect($read, $write, $timeout); + $available = $this->streamSelect($read, $write, $timeout); + if (false === $available) { + // if a system call has been interrupted, + // we cannot rely on it's outcome + return; + } foreach ($read as $stream) { $key = (int) $stream; @@ -245,14 +250,16 @@ private function waitForStreamActivity($timeout) * @param array &$write An array of write streams to select upon. * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever. * - * @return integer The total number of streams that are ready for read/write. + * @return integer|false The total number of streams that are ready for read/write. + * Can return false if stream_select() is interrupted by a signal. */ protected function streamSelect(array &$read, array &$write, $timeout) { if ($read || $write) { $except = null; - return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout); + // suppress warnings that occur, when stream_select is interrupted by a signal + return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout); } usleep($timeout); diff --git a/tests/AbstractLoopTest.php b/tests/AbstractLoopTest.php index b4095379..17163e8b 100644 --- a/tests/AbstractLoopTest.php +++ b/tests/AbstractLoopTest.php @@ -4,6 +4,9 @@ abstract class AbstractLoopTest extends TestCase { + /** + * @var \React\EventLoop\LoopInterface + */ protected $loop; public function setUp() diff --git a/tests/StreamSelectLoopTest.php b/tests/StreamSelectLoopTest.php index 55d3d165..61b059c1 100644 --- a/tests/StreamSelectLoopTest.php +++ b/tests/StreamSelectLoopTest.php @@ -2,10 +2,19 @@ namespace React\Tests\EventLoop; +use React\EventLoop\LoopInterface; use React\EventLoop\StreamSelectLoop; class StreamSelectLoopTest extends AbstractLoopTest { + protected function tearDown() + { + parent::tearDown(); + if (strncmp($this->getName(false), 'testSignal', 10) === 0 && extension_loaded('pcntl')) { + $this->resetSignalHandlers(); + } + } + public function createLoop() { return new StreamSelectLoop(); @@ -27,4 +36,113 @@ public function testStreamSelectTimeoutEmulation() $this->assertGreaterThan(0.04, $interval); } + + public function signalProvider() + { + return [ + ['SIGUSR1', SIGUSR1], + ['SIGHUP', SIGHUP], + ['SIGTERM', SIGTERM], + ]; + } + + private $_signalHandled = false; + + /** + * Test signal interrupt when no stream is attached to the loop + * @dataProvider signalProvider + */ + public function testSignalInterruptNoStream($sigName, $signal) + { + if (!extension_loaded('pcntl')) { + $this->markTestSkipped('"pcntl" extension is required to run this test.'); + } + + // dispatch signal handler once before signal is sent and once after + $this->loop->addTimer(0.01, function() { pcntl_signal_dispatch(); }); + $this->loop->addTimer(0.03, function() { pcntl_signal_dispatch(); }); + if (defined('HHVM_VERSION')) { + // hhvm startup is slow so we need to add another handler much later + $this->loop->addTimer(0.5, function() { pcntl_signal_dispatch(); }); + } + + $this->setUpSignalHandler($signal); + + // spawn external process to send signal to current process id + $this->forkSendSignal($signal); + $this->loop->run(); + $this->assertTrue($this->_signalHandled); + } + + /** + * Test signal interrupt when a stream is attached to the loop + * @dataProvider signalProvider + */ + public function testSignalInterruptWithStream($sigName, $signal) + { + if (!extension_loaded('pcntl')) { + $this->markTestSkipped('"pcntl" extension is required to run this test.'); + } + + // dispatch signal handler every 10ms + $this->loop->addPeriodicTimer(0.01, function() { pcntl_signal_dispatch(); }); + + // add stream to the loop + list($writeStream, $readStream) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + $this->loop->addReadStream($readStream, function($stream, $loop) { + /** @var $loop LoopInterface */ + $read = fgets($stream); + if ($read === "end loop\n") { + $loop->stop(); + } + }); + $this->loop->addTimer(0.05, function() use ($writeStream) { + fwrite($writeStream, "end loop\n"); + }); + + $this->setUpSignalHandler($signal); + + // spawn external process to send signal to current process id + $this->forkSendSignal($signal); + + $this->loop->run(); + + $this->assertTrue($this->_signalHandled); + } + + /** + * add signal handler for signal + */ + protected function setUpSignalHandler($signal) + { + $this->_signalHandled = false; + $this->assertTrue(pcntl_signal($signal, function() { $this->_signalHandled = true; })); + } + + /** + * reset all signal handlers to default + */ + protected function resetSignalHandlers() + { + foreach($this->signalProvider() as $signal) { + pcntl_signal($signal[1], SIG_DFL); + } + } + + /** + * fork child process to send signal to current process id + */ + protected function forkSendSignal($signal) + { + $currentPid = posix_getpid(); + $childPid = pcntl_fork(); + if ($childPid == -1) { + $this->fail("Failed to fork child process!"); + } else if ($childPid === 0) { + // this is executed in the child process + usleep(20000); + posix_kill($currentPid, $signal); + die(); + } + } }