diff --git a/README.md b/README.md index a9f6559..e2c8289 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ It enables you to set and query its data or use its PubSub topics to react to in * [Usage](#usage) * [Factory](#factory) * [createClient()](#createclient) + * [createLazyClient()](#createlazyclient) * [Client](#client) * [Commands](#commands) * [Promises](#promises) @@ -46,23 +47,22 @@ local Redis server and send some requests: $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); -$factory->createClient('localhost')->then(function (Client $client) use ($loop) { - $client->set('greeting', 'Hello world'); - $client->append('greeting', '!'); - - $client->get('greeting')->then(function ($greeting) { - // Hello world! - echo $greeting . PHP_EOL; - }); - - $client->incr('invocation')->then(function ($n) { - echo 'This is invocation #' . $n . PHP_EOL; - }); - - // end connection once all pending requests have been resolved - $client->end(); +$client = $factory->createLazyClient('localhost'); +$client->set('greeting', 'Hello world'); +$client->append('greeting', '!'); + +$client->get('greeting')->then(function ($greeting) { + // Hello world! + echo $greeting . PHP_EOL; +}); + +$client->incr('invocation')->then(function ($n) { + echo 'This is invocation #' . $n . PHP_EOL; }); +// end connection once all pending requests have been resolved +$client->end(); + $loop->run(); ``` @@ -100,7 +100,7 @@ $factory = new Factory($loop, $connector); #### createClient() -The `createClient($redisUri): PromiseInterface` method can be used to +The `createClient(string $redisUri): PromiseInterface` method can be used to create a new [`Client`](#client). It helps with establishing a plain TCP/IP or secure TLS connection to Redis @@ -195,6 +195,139 @@ authentication. You can explicitly pass a custom timeout value in seconds $factory->createClient('localhost?timeout=0.5'); ``` +#### createLazyClient() + +The `createLazyClient(string $redisUri): Client` method can be used to +create a new [`Client`](#client). + +It helps with establishing a plain TCP/IP or secure TLS connection to Redis +and optionally authenticating (AUTH) and selecting the right database (SELECT). + +```php +$client = $factory->createLazyClient('redis://localhost:6379'); + +$client->incr('hello'); +$client->end(); +``` + +This method immediately returns a "virtual" connection implementing the +[`Client`](#client) that can be used to interface with your Redis database. +Internally, it lazily creates the underlying database connection only on +demand once the first request is invoked on this instance and will queue +all outstanding requests until the underlying connection is ready. +Additionally, it will only keep this underlying connection in an "idle" state +for 60s by default and will automatically close the underlying connection when +it is no longer needed. + +From a consumer side this means that you can start sending commands to the +database right away while the underlying connection may still be +outstanding. Because creating this underlying connection may take some +time, it will enqueue all oustanding commands and will ensure that all +commands will be executed in correct order once the connection is ready. +In other words, this "virtual" connection behaves just like a "real" +connection as described in the `Client` interface and frees you from having +to deal with its async resolution. + +If the underlying database connection fails, it will reject all +outstanding commands and will return to the initial "idle" state. This +means that you can keep sending additional commands at a later time which +will again try to open a new underlying connection. Note that this may +require special care if you're using transactions (`MULTI`/`EXEC`) that are kept +open for longer than the idle period. + +While using PubSub channels (see `SUBSCRIBE` and `PSUBSCRIBE` commands), this client +will never reach an "idle" state and will keep pending forever (or until the +underlying database connection is lost). Additionally, if the underlying +database connection drops, it will automatically send the appropriate `unsubscribe` +and `punsubscribe` events for all currently active channel and pattern subscriptions. +This allows you to react to these events and restore your subscriptions by +creating a new underlying connection repeating the above commands again. + +Note that creating the underlying connection will be deferred until the +first request is invoked. Accordingly, any eventual connection issues +will be detected once this instance is first used. You can use the +`end()` method to ensure that the "virtual" connection will be soft-closed +and no further commands can be enqueued. Similarly, calling `end()` on +this instance when not currently connected will succeed immediately and +will not have to wait for an actual underlying connection. + +Depending on your particular use case, you may prefer this method or the +underlying `createClient()` which resolves with a promise. For many +simple use cases it may be easier to create a lazy connection. + +The `$redisUri` can be given in the +[standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form +`[redis[s]://][:auth@]host[:port][/db]`. +You can omit the URI scheme and port if you're connecting to the default port 6379: + +```php +// both are equivalent due to defaults being applied +$factory->createLazyClient('localhost'); +$factory->createLazyClient('redis://localhost:6379'); +``` + +Redis supports password-based authentication (`AUTH` command). Note that Redis' +authentication mechanism does not employ a username, so you can pass the +password `h@llo` URL-encoded (percent-encoded) as part of the URI like this: + +```php +// all forms are equivalent +$factory->createLazyClient('redis://:h%40llo@localhost'); +$factory->createLazyClient('redis://ignored:h%40llo@localhost'); +$factory->createLazyClient('redis://localhost?password=h%40llo'); +``` + +You can optionally include a path that will be used to select (SELECT command) the right database: + +```php +// both forms are equivalent +$factory->createLazyClient('redis://localhost/2'); +$factory->createLazyClient('redis://localhost?db=2'); +``` + +You can use the [standard](https://www.iana.org/assignments/uri-schemes/prov/rediss) +`rediss://` URI scheme if you're using a secure TLS proxy in front of Redis: + +```php +$factory->createLazyClient('rediss://redis.example.com:6340'); +``` + +You can use the `redis+unix://` URI scheme if your Redis instance is listening +on a Unix domain socket (UDS) path: + +```php +$factory->createLazyClient('redis+unix:///tmp/redis.sock'); + +// the URI MAY contain `password` and `db` query parameters as seen above +$factory->createLazyClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); + +// the URI MAY contain authentication details as userinfo as seen above +// should be used with care, also note that database can not be passed as path +$factory->createLazyClient('redis+unix://:secret@/tmp/redis.sock'); +``` + +This method respects PHP's `default_socket_timeout` setting (default 60s) +as a timeout for establishing the underlying connection and waiting for +successful authentication. You can explicitly pass a custom timeout value +in seconds (or use a negative number to not apply a timeout) like this: + +```php +$factory->createLazyClient('localhost?timeout=0.5'); +``` + +By default, this method will keep "idle" connection open for 60s and will +then end the underlying connection. The next request after an "idle" +connection ended will automatically create a new underlying connection. +This ensure you always get a "fresh" connection and as such should not be +confused with a "keepalive" or "heartbeat" mechanism, as this will not +actively try to probe the connection. You can explicitly pass a custom +idle timeout value in seconds (or use a negative number to not apply a +timeout) like this: + +```php +$factory->createLazyClient('localhost?idle=0.1'); +``` + ### Client The `Client` is responsible for exchanging messages with Redis diff --git a/composer.json b/composer.json index b2b549e..a2346c4 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,7 @@ "php": ">=5.3", "clue/redis-protocol": "0.3.*", "evenement/evenement": "^3.0 || ^2.0 || ^1.0", - "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3", + "react/event-loop": "^1.0 || ^0.5", "react/promise": "^2.0 || ^1.1", "react/promise-timer": "^1.5", "react/socket": "^1.1" diff --git a/examples/incr.php b/examples/incr.php index 35c0684..f61a033 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -1,6 +1,5 @@ createClient('localhost')->then(function (Client $client) { - $client->incr('test'); +$client = $factory->createLazyClient('localhost'); +$client->incr('test'); - $client->get('test')->then(function ($result) { - var_dump($result); - }); - - $client->end(); +$client->get('test')->then(function ($result) { + var_dump($result); }); +$client->end(); + $loop->run(); diff --git a/examples/publish.php b/examples/publish.php index 2d6e8f1..e4fe494 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -1,6 +1,5 @@ createClient('localhost')->then(function (Client $client) use ($channel, $message) { - $client->publish($channel, $message)->then(function ($received) { - echo 'successfully published. Received by ' . $received . PHP_EOL; - }); - - $client->end(); +$client = $factory->createLazyClient('localhost'); +$client->publish($channel, $message)->then(function ($received) { + echo 'successfully published. Received by ' . $received . PHP_EOL; }); +$client->end(); + $loop->run(); diff --git a/examples/subscribe.php b/examples/subscribe.php index efae0a4..87695ff 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -1,6 +1,5 @@ createClient('localhost')->then(function (Client $client) use ($channel) { - $client->subscribe($channel)->then(function () { - echo 'Now subscribed to channel ' . PHP_EOL; - }); +$client = $factory->createLazyClient('localhost'); +$client->subscribe($channel)->then(function () { + echo 'Now subscribed to channel ' . PHP_EOL; +}, function (Exception $e) { + echo 'Unable to subscribe: ' . $e->getMessage() . PHP_EOL; +}); + +$client->on('message', function ($channel, $message) { + echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; +}); + +// automatically re-subscribe to channel on connection issues +$client->on('unsubscribe', function ($channel) use ($client, $loop) { + echo 'Unsubscribed from ' . $channel . PHP_EOL; - $client->on('message', function ($channel, $message) { - echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; + $loop->addPeriodicTimer(2.0, function ($timer) use ($client, $channel, $loop){ + $client->subscribe($channel)->then(function () use ($timer, $loop) { + echo 'Now subscribed again' . PHP_EOL; + $loop->cancelTimer($timer); + }, function (Exception $e) { + echo 'Unable to subscribe again: ' . $e->getMessage() . PHP_EOL; + }); }); }); diff --git a/src/Factory.php b/src/Factory.php index 6686411..d9491a5 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -39,10 +39,10 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector = } /** - * create redis client connected to address of given redis instance + * Create Redis client connected to address of given redis instance * * @param string $target Redis server URI to connect to - * @return \React\Promise\PromiseInterface resolves with Client or rejects with \Exception + * @return \React\Promise\PromiseInterface resolves with Client or rejects with \Exception */ public function createClient($target) { @@ -115,6 +115,17 @@ function ($error) use ($client) { }); } + /** + * Create Redis client connected to address of given redis instance + * + * @param string $target + * @return Client + */ + public function createLazyClient($target) + { + return new LazyClient($target, $this, $this->loop); + } + /** * @param string $target * @return array with keys authority, auth and db diff --git a/src/LazyClient.php b/src/LazyClient.php new file mode 100644 index 0000000..17f80c0 --- /dev/null +++ b/src/LazyClient.php @@ -0,0 +1,214 @@ +idlePeriod = (float)$args['idle']; + } + + $this->target = $target; + $this->factory = $factory; + $this->loop = $loop; + } + + private function client() + { + if ($this->promise !== null) { + return $this->promise; + } + + $self = $this; + $pending =& $this->promise; + $idleTimer=& $this->idleTimer; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + $loop = $this->loop; + return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) { + // connection completed => remember only until closed + $client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) { + $pending = null; + + // foward unsubscribe/punsubscribe events when underlying connection closes + $n = count($subscribed); + foreach ($subscribed as $channel => $_) { + $self->emit('unsubscribe', array($channel, --$n)); + } + $n = count($psubscribed); + foreach ($psubscribed as $pattern => $_) { + $self->emit('punsubscribe', array($pattern, --$n)); + } + $subscribed = array(); + $psubscribed = array(); + + if ($idleTimer !== null) { + $loop->cancelTimer($idleTimer); + $idleTimer = null; + } + }); + + // keep track of all channels and patterns this connection is subscribed to + $client->on('subscribe', function ($channel) use (&$subscribed) { + $subscribed[$channel] = true; + }); + $client->on('psubscribe', function ($pattern) use (&$psubscribed) { + $psubscribed[$pattern] = true; + }); + $client->on('unsubscribe', function ($channel) use (&$subscribed) { + unset($subscribed[$channel]); + }); + $client->on('punsubscribe', function ($pattern) use (&$psubscribed) { + unset($psubscribed[$pattern]); + }); + + Util::forwardEvents( + $client, + $self, + array( + 'message', + 'subscribe', + 'unsubscribe', + 'pmessage', + 'psubscribe', + 'punsubscribe', + ) + ); + + return $client; + }, function (\Exception $e) use (&$pending) { + // connection failed => discard connection attempt + $pending = null; + + throw $e; + }); + } + + public function __call($name, $args) + { + if ($this->closed) { + return \React\Promise\reject(new \RuntimeException('Connection closed')); + } + + $that = $this; + return $this->client()->then(function (Client $client) use ($name, $args, $that) { + $that->awake(); + return \call_user_func_array(array($client, $name), $args)->then( + function ($result) use ($that) { + $that->idle(); + return $result; + }, + function ($error) use ($that) { + $that->idle(); + throw $error; + } + ); + }); + } + + public function end() + { + if ($this->promise === null) { + $this->close(); + } + + if ($this->closed) { + return; + } + + $that = $this; + return $this->client()->then(function (Client $client) use ($that) { + $client->on('close', function () use ($that) { + $that->close(); + }); + $client->end(); + }); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // either close active connection or cancel pending connection attempt + if ($this->promise !== null) { + $this->promise->then(function (Client $client) { + $client->close(); + }); + $this->promise->cancel(); + $this->promise = null; + } + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + + $this->emit('close'); + $this->removeAllListeners(); + } + + /** + * @internal + */ + public function awake() + { + ++$this->pending; + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + } + + /** + * @internal + */ + public function idle() + { + --$this->pending; + + if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed) { + $idleTimer =& $this->idleTimer; + $promise =& $this->promise; + $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) { + $promise->then(function (Client $client) { + $client->close(); + }); + $promise = null; + $idleTimer = null; + }); + } + } +} diff --git a/tests/FactoryLazyClientTest.php b/tests/FactoryLazyClientTest.php new file mode 100644 index 0000000..bd63c68 --- /dev/null +++ b/tests/FactoryLazyClientTest.php @@ -0,0 +1,148 @@ +loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); + $this->factory = new Factory($this->loop, $this->connector); + } + + public function testWillConnectWithDefaultPort() + { + $this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('redis.example.com'); + } + + public function testWillConnectToLocalhost() + { + $this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('localhost:1337'); + } + + public function testWillResolveIfConnectorResolves() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write'); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $client = $this->factory->createLazyClient('localhost'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillWriteSelectCommandIfTargetContainsPath() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1/demo'); + } + + public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1?db=4'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://:h%40llo@example.com'); + } + + public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=secret'); + } + + public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=h%40llo'); + } + + public function testWillWriteAuthCommandIfRedissUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('rediss://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock'); + } + + public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo'); + } + + public function testWillRejectIfConnectorRejects() + { + $this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException())); + $client = $this->factory->createLazyClient('redis://127.0.0.1:2'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillRejectIfTargetIsInvalid() + { + $client = $this->factory->createLazyClient('http://invalid target'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } +} diff --git a/tests/FactoryTest.php b/tests/FactoryStreamingClientTest.php similarity index 99% rename from tests/FactoryTest.php rename to tests/FactoryStreamingClientTest.php index 883cbb6..89c5555 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryStreamingClientTest.php @@ -6,7 +6,7 @@ use React\Promise; use React\Promise\Deferred; -class FactoryTest extends TestCase +class FactoryStreamingClientTest extends TestCase { private $loop; private $connector; diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index d3a81f9..b8b722b 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -14,23 +14,22 @@ class FunctionalTest extends TestCase { private $loop; private $factory; - private $client; + private $uri; public function setUp() { - $uri = getenv('REDIS_URI'); - if ($uri === false) { + $this->uri = getenv('REDIS_URI'); + if ($this->uri === false) { $this->markTestSkipped('No REDIS_URI environment variable given'); } $this->loop = new StreamSelectLoop(); $this->factory = new Factory($this->loop); - $this->client = $this->createClient($uri); } public function testPing() { - $client = $this->client; + $client = $this->createClient($this->uri); $promise = $client->ping(); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); @@ -38,13 +37,47 @@ public function testPing() $ret = Block\await($promise, $this->loop); $this->assertEquals('PONG', $ret); + } + + public function testPingLazy() + { + $client = $this->factory->createLazyClient($this->uri); + + $promise = $client->ping(); + $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); + + $ret = Block\await($promise, $this->loop); + + $this->assertEquals('PONG', $ret); + } + + /** + * @doesNotPerformAssertions + */ + public function testPingLazyWillNotBlockLoopWhenIdleTimeIsSmall() + { + $client = $this->factory->createLazyClient($this->uri . '?idle=0'); + + $client->ping(); - return $client; + $this->loop->run(); + } + + /** + * @doesNotPerformAssertions + */ + public function testLazyClientWithoutCommandsWillNotBlockLoop() + { + $client = $this->factory->createLazyClient($this->uri); + + $this->loop->run(); + + unset($client); } public function testMgetIsNotInterpretedAsSubMessage() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->mset('message', 'message', 'channel', 'channel', 'payload', 'payload'); @@ -56,7 +89,7 @@ public function testMgetIsNotInterpretedAsSubMessage() public function testPipeline() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->set('a', 1)->then($this->expectCallableOnceWith('OK')); $client->incr('a')->then($this->expectCallableOnceWith(2)); @@ -68,7 +101,8 @@ public function testPipeline() public function testInvalidCommand() { - $promise = $this->client->doesnotexist(1, 2, 3); + $client = $this->createClient($this->uri); + $promise = $client->doesnotexist(1, 2, 3); if (method_exists($this, 'expectException')) { $this->expectException('Exception'); @@ -80,15 +114,16 @@ public function testInvalidCommand() public function testMultiExecEmpty() { - $this->client->multi()->then($this->expectCallableOnceWith('OK')); - $promise = $this->client->exec()->then($this->expectCallableOnceWith(array())); + $client = $this->createClient($this->uri); + $client->multi()->then($this->expectCallableOnceWith('OK')); + $promise = $client->exec()->then($this->expectCallableOnceWith(array())); Block\await($promise, $this->loop); } public function testMultiExecQueuedExecHasValues() { - $client = $this->client; + $client = $this->createClient($this->uri); $client->multi()->then($this->expectCallableOnceWith('OK')); $client->set('b', 10)->then($this->expectCallableOnceWith('QUEUED')); @@ -102,8 +137,8 @@ public function testMultiExecQueuedExecHasValues() public function testPubSub() { - $consumer = $this->client; - $producer = $this->createClient(getenv('REDIS_URI')); + $consumer = $this->createClient($this->uri); + $producer = $this->createClient($this->uri); $channel = 'channel:test:' . mt_rand(); @@ -122,11 +157,24 @@ public function testPubSub() public function testClose() { - $this->client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + $client = $this->createClient($this->uri); + + $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + + $client->close(); + + $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + } + + public function testCloseLazy() + { + $client = $this->factory->createLazyClient($this->uri); + + $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); - $this->client->close(); + $client->close(); - $this->client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); } public function testInvalidProtocol() diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php new file mode 100644 index 0000000..0d374d8 --- /dev/null +++ b/tests/LazyClientTest.php @@ -0,0 +1,486 @@ +factory = $this->getMockBuilder('Clue\React\Redis\Factory')->disableOriginalConstructor()->getMock(); + $this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + + $this->client = new LazyClient('localhost', $this->factory, $this->loop); + } + + public function testPingWillCreateUnderlyingClientAndReturnPendingPromise() + { + $promise = new Promise(function () { }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->loop->expects($this->never())->method('addTimer'); + + $promise = $this->client->ping(); + + $promise->then($this->expectCallableNever()); + } + + public function testPingTwiceWillCreateOnceUnderlyingClient() + { + $promise = new Promise(function () { }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->ping(); + $this->client->ping(); + } + + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimer() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->once())->method('addTimer')->with(60.0, $this->anything()); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimerWithIdleTimeFromQueryParam() + { + $this->client = new LazyClient('localhost?idle=10', $this->factory, $this->loop); + + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->once())->method('addTimer')->with(10.0, $this->anything()); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillResolveWhenUnderlyingClientResolvesPingAndNotStartIdleTimerWhenIdleParamIsNegative() + { + $this->client = new LazyClient('localhost?idle=-1', $this->factory, $this->loop); + + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->never())->method('addTimer'); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then($this->expectCallableOnceWith('PONG')); + } + + public function testPingWillRejectWhenUnderlyingClientRejectsPingAndStartIdleTimer() + { + $error = new \RuntimeException(); + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\reject($error)); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->loop->expects($this->once())->method('addTimer'); + + $promise = $this->client->ping(); + $deferred->resolve($client); + + $promise->then(null, $this->expectCallableOnceWith($error)); + } + + public function testPingWillRejectAndNotEmitErrorOrCloseWhenFactoryRejectsUnderlyingClient() + { + $error = new \RuntimeException(); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->on('error', $this->expectCallableNever()); + $this->client->on('close', $this->expectCallableNever()); + + $promise = $this->client->ping(); + $deferred->reject($error); + + $promise->then(null, $this->expectCallableOnceWith($error)); + } + + public function testPingAfterPreviousFactoryRejectsUnderlyingClientWillCreateNewUnderlyingConnection() + { + $error = new \RuntimeException(); + + $deferred = new Deferred(); + $this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->client->ping(); + $deferred->reject($error); + + $this->client->ping(); + } + + public function testPingAfterPreviousUnderlyingClientAlreadyClosedWillCreateNewUnderlyingConnection() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + + $this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls( + \React\Promise\resolve($client), + new Promise(function () { }) + ); + + $this->client->ping(); + $client->emit('close'); + + $this->client->ping(); + } + + public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection() + { + $this->factory->expects($this->never())->method('createClient'); + + $this->client->close(); + $promise = $this->client->ping(); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->never())->method('addTimer'); + + $this->client->ping(); + $this->client->ping(); + $deferred->resolve(); + } + + public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStartsAfterFirstResolves() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $deferred->promise(), + new Promise(function () { }) + ); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->ping(); + $deferred->resolve(); + $this->client->ping(); + } + + public function testPingFollowedByIdleTimerWillCloseUnderlyingConnectionWithoutCloseEvent() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'close'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + $client->expects($this->once())->method('close')->willReturn(\React\Promise\resolve()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timeout = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->with($this->anything(), $this->callback(function ($cb) use (&$timeout) { + $timeout = $cb; + return true; + }))->willReturn($timer); + + $this->client->on('close', $this->expectCallableNever()); + + $this->client->ping(); + + $this->assertNotNull($timeout); + $timeout(); + } + + public function testCloseWillEmitCloseEventWithoutCreatingUnderlyingClient() + { + $this->factory->expects($this->never())->method('createClient'); + + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + } + + public function testCloseTwiceWillEmitCloseEventOnce() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + $this->client->close(); + } + + public function testCloseAfterPingWillCancelUnderlyingClientConnectionWhenStillPending() + { + $promise = new Promise(function () { }, $this->expectCallableOnce()); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->ping(); + $this->client->close(); + } + + public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientConnectionThrowsDueToCancellation() + { + $promise = new Promise(function () { }, function () { + throw new \RuntimeException('Discarded'); + }); + $this->factory->expects($this->once())->method('createClient')->willReturn($promise); + + $this->client->on('error', $this->expectCallableNever()); + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->ping(); + $this->client->close(); + } + + public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + $client->expects($this->once())->method('close'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + $this->client->close(); + } + + public function testCloseAfterPingWillCancelIdleTimerWhenPingIsAlreadyResolved() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'close'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + $client->expects($this->once())->method('close'); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->ping(); + $deferred->resolve(); + $this->client->close(); + } + + public function testEndWillCloseClientIfUnderlyingConnectionIsNotPending() + { + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + } + + public function testEndAfterPingWillEndUnderlyingClient() + { + $client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('end'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + $this->client->end(); + } + + public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'end'))->getMock(); + $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('end'); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + + $client->emit('close'); + } + + public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError() + { + $error = new \RuntimeException(); + + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('error', $this->expectCallableNever()); + $client->emit('error', array($error)); + } + + public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->ping(); + $deferred->resolve($client); + + $this->client->on('close', $this->expectCallableNever()); + $client->emit('close'); + } + + public function testEmitsNoCloseEventButWillCancelIdleTimerWhenUnderlyingConnectionEmitsCloseAfterPingIsAlreadyResolved() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $this->loop->expects($this->once())->method('addTimer')->willReturn($timer); + $this->loop->expects($this->once())->method('cancelTimer')->with($timer); + + $this->client->on('close', $this->expectCallableNever()); + + $this->client->ping(); + $deferred->resolve(); + + $client->emit('close'); + } + + public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); + + $deferred = new Deferred(); + $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); + + $this->client->subscribe('foo'); + $deferred->resolve($client); + + $this->client->on('message', $this->expectCallableOnce()); + $client->emit('message', array('foo', 'bar')); + } + + public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClosesWhileUsingPubSubChannel() + { + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(6))->method('__call')->willReturn(\React\Promise\resolve()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + + $this->client->subscribe('bar'); + $client->emit('subscribe', array('bar', 2)); + + $this->client->unsubscribe('bar'); + $client->emit('unsubscribe', array('bar', 1)); + + $this->client->psubscribe('foo*'); + $client->emit('psubscribe', array('foo*', 1)); + + $this->client->psubscribe('bar*'); + $client->emit('psubscribe', array('bar*', 2)); + + $this->client->punsubscribe('bar*'); + $client->emit('punsubscribe', array('bar*', 1)); + + $this->client->on('unsubscribe', $this->expectCallableOnce()); + $this->client->on('punsubscribe', $this->expectCallableOnce()); + $client->emit('close'); + } + + public function testSubscribeWillResolveWhenUnderlyingClientResolvesSubscribeAndNotStartIdleTimerWithIdleDueToSubscription() + { + $deferred = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->once())->method('__call')->with('subscribe')->willReturn($deferred->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->never())->method('addTimer'); + + $promise = $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + $deferred->resolve(array('subscribe', 'foo', 1)); + + $promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1))); + } + + public function testUnsubscribeAfterSubscribeWillResolveWhenUnderlyingClientResolvesUnsubscribeAndStartIdleTimerWhenSubscriptionStopped() + { + $deferredSubscribe = new Deferred(); + $deferredUnsubscribe = new Deferred(); + $client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock(); + $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise()); + + $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); + + $this->loop->expects($this->once())->method('addTimer'); + + $promise = $this->client->subscribe('foo'); + $client->emit('subscribe', array('foo', 1)); + $deferredSubscribe->resolve(array('subscribe', 'foo', 1)); + $promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1))); + + $promise = $this->client->unsubscribe('foo'); + $client->emit('unsubscribe', array('foo', 0)); + $deferredUnsubscribe->resolve(array('unsubscribe', 'foo', 0)); + $promise->then($this->expectCallableOnceWith(array('unsubscribe', 'foo', 0))); + } +}