From 7f7e8815e24b507e24817d22697e97d4769b95bc Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sat, 9 Feb 2019 22:55:18 +0100 Subject: [PATCH 1/6] Renamed StreamingClientTest to FactoryStreamingClientTest This is in preparation of creating a FactoryLazyStreamingClientTest for the new LazyStreamingClient --- tests/{FactoryTest.php => FactoryStreamingClientTest.php} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/{FactoryTest.php => FactoryStreamingClientTest.php} (99%) diff --git a/tests/FactoryTest.php b/tests/FactoryStreamingClientTest.php similarity index 99% rename from tests/FactoryTest.php rename to tests/FactoryStreamingClientTest.php index 5d89786..223c898 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryStreamingClientTest.php @@ -3,7 +3,7 @@ use Clue\React\Redis\Factory; use React\Promise; -class FactoryTest extends TestCase +class FactoryStreamingClientTest extends TestCase { private $loop; private $connector; From 478025f319d3ed95dda3621caf7b8aeb68bf95e7 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 21:32:30 +0100 Subject: [PATCH 2/6] LazyStreamingClient --- src/LazyStreamingClient.php | 122 +++++++++++++++++ tests/LazyStreamingClientTest.php | 216 ++++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+) create mode 100644 src/LazyStreamingClient.php create mode 100644 tests/LazyStreamingClientTest.php diff --git a/src/LazyStreamingClient.php b/src/LazyStreamingClient.php new file mode 100644 index 0000000..03d061d --- /dev/null +++ b/src/LazyStreamingClient.php @@ -0,0 +1,122 @@ +target = $target; + $this->factory = $factory; + + $this->on('close', array($this, 'removeAllListeners')); + } + + private function client() + { + if ($this->promise instanceof PromiseInterface) { + return $this->promise; + } + + if ($this->client instanceof Client) { + return new FulfilledPromise($this->client()); + } + + $self = $this; + return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) { + $self->client = $client; + $self->promise = null; + + Util::forwardEvents( + $self->client, + $self, + array( + 'error', + 'close', + 'message', + 'subscribe', + 'unsubscribe', + 'pmessage', + 'psubscribe', + 'punsubscribe', + ) + ); + + return $client; + }, function (\Exception $e) use ($self) { + // connection failed => emit error if connection is not already closed + if ($self->closed) { + return; + } + $self->emit('error', array($e)); + $self->close(); + + return $e; + }); + } + + public function __call($name, $args) + { + if ($this->client instanceof Client) { + return \call_user_func_array(array($this->client, $name), $args); + } + + return $this->client()->then(function (Client $client) use ($name, $args) { + return \call_user_func_array(array($client, $name), $args); + }); + } + + public function end() + { + if ($this->client instanceof Client) { + return $this->client->end(); + } + + return $this->client()->then(function (Client $client) { + return $client->end(); + }); + } + + public function close() + { + if ($this->client instanceof Client) { + return $this->client->close(); + } + + return $this->client()->then(function (Client $client) { + return $client->close(); + }); + } +} diff --git a/tests/LazyStreamingClientTest.php b/tests/LazyStreamingClientTest.php new file mode 100644 index 0000000..f706d99 --- /dev/null +++ b/tests/LazyStreamingClientTest.php @@ -0,0 +1,216 @@ +factory = $this->getMockBuilder('Clue\React\Redis\Factory')->setConstructorArgs(array(Factory::create()))->getMock(); + $this->stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface')->getMock(); + $this->parser = $this->getMockBuilder('Clue\Redis\Protocol\Parser\ParserInterface')->getMock(); + $this->serializer = $this->getMockBuilder('Clue\Redis\Protocol\Serializer\SerializerInterface')->getMock(); + + $this->factory->expects($this->any())->method('createClient')->with('localhost')->will($this->returnValue(new FulfilledPromise(new StreamingClient($this->stream, $this->parser, $this->serializer)))); + + $this->client = new LazyStreamingClient('localhost', $this->factory); + } + + public function testSending() + { + $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); + $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); + + $this->client->ping(); + } + + public function testClosingClientEmitsEvent() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + } + + public function testClosingStreamClosesClient() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('close', $this->expectCallableOnce()); + + $this->stream->emit('close'); + } + + public function testReceiveParseErrorEmitsErrorEvent() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('error', $this->expectCallableOnce()); + $this->client->on('close', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); + $this->stream->emit('data', array('message')); + } + + public function testReceiveThrowMessageEmitsErrorEvent() + { + $this->stream = new ThroughStream(); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $this->client->on('error', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); + $this->stream->emit('data', array('message')); + } + + public function testDefaultCtor() + { + $client = new LazyStreamingClient('localhost', $this->factory); + } + + public function testPingPong() + { + $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping')); + + $promise = $this->client->ping(); + + $this->client->handleMessage(new BulkReply('PONG')); + + $this->expectPromiseResolve($promise); + $promise->then($this->expectCallableOnce('PONG')); + } + + public function testMonitorCommandIsNotSupported() + { + $promise = $this->client->monitor(); + + $this->expectPromiseReject($promise); + } + + + public function testErrorReply() + { + $promise = $this->client->invalid(); + + $err = new ErrorReply("ERR unknown command 'invalid'"); + $this->client->handleMessage($err); + + $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnce($err)); + } + + public function testClosingClientRejectsAllRemainingRequests() + { + $promise = $this->client->ping(); + $this->client->close(); + + $this->expectPromiseReject($promise); + } + + public function testClosedClientRejectsAllNewRequests() + { + $this->client->close(); + $promise = $this->client->ping(); + + $this->expectPromiseReject($promise); + } + + public function testEndingNonBusyClosesClient() + { + $this->client->on('close', $this->expectCallableOnce()); + $this->client->end(); + } + + public function testEndingBusyClosesClientWhenNotBusyAnymore() + { + // count how often the "close" method has been called + $closed = 0; + $this->client->on('close', function() use (&$closed) { + ++$closed; + }); + + $promise = $this->client->ping(); + $this->assertEquals(0, $closed); + + $this->client->end(); + $this->assertEquals(0, $closed); + + $this->client->handleMessage(new BulkReply('PONG')); + $promise->then($this->expectCallableOnce('PONG')); + $this->assertEquals(1, $closed); + } + + public function testClosingMultipleTimesEmitsOnce() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + $this->client->close(); + } + + public function testReceivingUnexpectedMessageThrowsException() + { + $this->setExpectedException('UnderflowException'); + $this->client->handleMessage(new BulkReply('PONG'))->done(); + } + + public function testPubsubSubscribe() + { + $promise = $this->client->subscribe('test'); + $this->expectPromiseResolve($promise); + + $this->client->on('subscribe', $this->expectCallableOnce()); + $this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1)))); + + return $this->client; + } + + /** + * @depends testPubsubSubscribe + * @param Client $client + */ + public function testPubsubPatternSubscribe(Client $client) + { + $promise = $client->psubscribe('demo_*'); + $this->expectPromiseResolve($promise); + + $client->on('psubscribe', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1)))); + + return $client; + } + + /** + * @depends testPubsubPatternSubscribe + * @param Client $client + */ + public function testPubsubMessage(Client $client) + { + $client->on('message', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')))); + } + + public function testPubsubSubscribeSingleOnly() + { + $this->expectPromiseReject($this->client->subscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe()); + } +} From bdbc87be84398990eb2376849be77ee858c26a5a Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 21:34:06 +0100 Subject: [PATCH 3/6] Factory::createLazyClient --- src/Factory.php | 5 + tests/FactoryLazyStreamingClientTest.php | 151 +++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 tests/FactoryLazyStreamingClientTest.php diff --git a/src/Factory.php b/src/Factory.php index ac80a61..a3db05e 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -36,6 +36,11 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector = $this->protocol = $protocol; } + public function createLazyClient($target) + { + return new LazyStreamingClient($target, $this); + } + /** * create redis client connected to address of given redis instance * diff --git a/tests/FactoryLazyStreamingClientTest.php b/tests/FactoryLazyStreamingClientTest.php new file mode 100644 index 0000000..05778a9 --- /dev/null +++ b/tests/FactoryLazyStreamingClientTest.php @@ -0,0 +1,151 @@ +loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); + $this->factory = new Factory($this->loop, $this->connector); + } + + public function testCtor() + { + $this->factory = new Factory($this->loop); + } + + public function testWillConnectWithDefaultPort() + { + $this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('redis.example.com'); + } + + public function testWillConnectToLocalhost() + { + $this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createLazyClient('localhost:1337'); + } + + public function testWillResolveIfConnectorResolves() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write'); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $client = $this->factory->createLazyClient('localhost'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillWriteSelectCommandIfTargetContainsPath() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1/demo'); + } + + public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n"); + + $this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://127.0.0.1?db=4'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://:h%40llo@example.com'); + } + + public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=secret'); + } + + public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis://example.com?password=h%40llo'); + } + + public function testWillWriteAuthCommandIfRedissUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('rediss://hello:world@example.com'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world'); + } + + public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock'); + } + + public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); + + $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo'); + } + + public function testWillRejectIfConnectorRejects() + { + $this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException())); + $client = $this->factory->createLazyClient('redis://127.0.0.1:2'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } + + public function testWillRejectIfTargetIsInvalid() + { + $client = $this->factory->createLazyClient('http://invalid target'); + + $this->assertInstanceOf('Clue\React\Redis\Client', $client); + } +} From 5ad069f6447c671dfe5f083fcae5146536b4268b Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 21:34:30 +0100 Subject: [PATCH 4/6] Updated examples to use lazy client --- examples/cli.php | 9 ++++++--- examples/incr.php | 14 +++++++------- examples/publish.php | 12 ++++++------ examples/subscribe.php | 14 +++++++------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/examples/cli.php b/examples/cli.php index 6da8c25..d11b2e7 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -11,7 +11,10 @@ echo '# connecting to redis...' . PHP_EOL; -$factory->createClient('localhost')->then(function (Client $client) use ($loop) { +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); + +try { echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL; $loop->addReadStream(STDIN, function () use ($client, $loop) { @@ -48,10 +51,10 @@ $loop->removeReadStream(STDIN); }); -}, function (Exception $error) { +} catch (Exception $error) { echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL; exit(1); -}); +}; $loop->run(); diff --git a/examples/incr.php b/examples/incr.php index 35c0684..7667b2c 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -8,14 +8,14 @@ $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); -$factory->createClient('localhost')->then(function (Client $client) { - $client->incr('test'); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->incr('test'); - $client->get('test')->then(function ($result) { - var_dump($result); - }); - - $client->end(); +$client->get('test')->then(function ($result) { + var_dump($result); }); +$client->end(); + $loop->run(); diff --git a/examples/publish.php b/examples/publish.php index 2d6e8f1..b0eef97 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -11,12 +11,12 @@ $channel = isset($argv[1]) ? $argv[1] : 'channel'; $message = isset($argv[2]) ? $argv[2] : 'message'; -$factory->createClient('localhost')->then(function (Client $client) use ($channel, $message) { - $client->publish($channel, $message)->then(function ($received) { - echo 'successfully published. Received by ' . $received . PHP_EOL; - }); - - $client->end(); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->publish($channel, $message)->then(function ($received) { + echo 'successfully published. Received by ' . $received . PHP_EOL; }); +$client->end(); + $loop->run(); diff --git a/examples/subscribe.php b/examples/subscribe.php index efae0a4..8ddbeb8 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -10,14 +10,14 @@ $channel = isset($argv[1]) ? $argv[1] : 'channel'; -$factory->createClient('localhost')->then(function (Client $client) use ($channel) { - $client->subscribe($channel)->then(function () { - echo 'Now subscribed to channel ' . PHP_EOL; - }); +/** @var Client $client */ +$client = $factory->createLazyClient('localhost'); +$client->subscribe($channel)->then(function () { + echo 'Now subscribed to channel ' . PHP_EOL; +}); - $client->on('message', function ($channel, $message) { - echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; - }); +$client->on('message', function ($channel, $message) { + echo 'Message on ' . $channel . ': ' . $message . PHP_EOL; }); $loop->run(); From b0422ccf77f0e8b7787a29d7f8473d70b2cebd65 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 21:50:05 +0100 Subject: [PATCH 5/6] PHP 5.3 -.- --- tests/LazyStreamingClientTest.php | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/LazyStreamingClientTest.php b/tests/LazyStreamingClientTest.php index f706d99..178e434 100644 --- a/tests/LazyStreamingClientTest.php +++ b/tests/LazyStreamingClientTest.php @@ -167,8 +167,13 @@ public function testClosingMultipleTimesEmitsOnce() public function testReceivingUnexpectedMessageThrowsException() { - $this->setExpectedException('UnderflowException'); - $this->client->handleMessage(new BulkReply('PONG'))->done(); + $that = $this; + $this->client->handleMessage(new BulkReply('PONG'))->then(function($value) use ($that) { + $that->assertNull($value); + $that->fail('promise resolved'); + }, function($value) use ($that) { + $that->assertInstanceOf('UnderflowException'); + }); } public function testPubsubSubscribe() From bc99b2d11f025f3cd1b815aa08bda4d6e3cb9292 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 10 Feb 2019 22:17:37 +0100 Subject: [PATCH 6/6] Added note about Factory::createLazyClient --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 5136784..733f628 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ It enables you to set and query its data or use its PubSub topics to react to in * [Usage](#usage) * [Factory](#factory) * [createClient()](#createclient) + * [createLazyClient()](#createlazyclient) * [Client](#client) * [Commands](#commands) * [Promises](#promises) @@ -166,6 +167,12 @@ $factory->createClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); $factory->createClient('redis+unix://:secret@/tmp/redis.sock'); ``` +#### createLazyClient() + +The `createLazyClient($redisUri)` method can be used to create a new [`Client`](#client) which lazily +creates and connects to the configured redis server on the first command. Internally it will use `createClient()` +when the first command comes in, queues all commands while connecting, and pass on all commands directly when connected. + ### Client The `Client` is responsible for exchanging messages with Redis