diff --git a/README.md b/README.md index 4a76831..7bbcfed 100644 --- a/README.md +++ b/README.md @@ -42,10 +42,8 @@ It enables you to set and query its data or use its PubSub topics to react to in * [Promises](#promises) * [PubSub](#pubsub) * [API](#api) - * [Factory](#factory) - * [createClient()](#createclient) - * [createLazyClient()](#createlazyclient) - * [Client](#client) + * [RedisClient](#redisclient) + * [__construct()](#__construct) * [__call()](#__call) * [end()](#end) * [close()](#close) @@ -75,8 +73,7 @@ local Redis server and send some requests: require __DIR__ . '/vendor/autoload.php'; -$factory = new Clue\React\Redis\Factory(); -$redis = $factory->createLazyClient('localhost:6379'); +$redis = new Clue\React\Redis\RedisClient('localhost:6379'); $redis->set('greeting', 'Hello world'); $redis->append('greeting', '!'); @@ -100,10 +97,12 @@ See also the [examples](examples). ### Commands -Most importantly, this project provides a [`Client`](#client) instance that +Most importantly, this project provides a [`RedisClient`](#redisclient) instance that can be used to invoke all [Redis commands](https://redis.io/commands) (such as `GET`, `SET`, etc.). ```php +$redis = new Clue\React\Redis\RedisClient('localhost:6379'); + $redis->get($key); $redis->set($key, $value); $redis->exists($key); @@ -262,161 +261,28 @@ $redis->on('punsubscribe', function (string $pattern, int $total) { }); ``` -When using the [`createLazyClient()`](#createlazyclient) method, the `unsubscribe` -and `punsubscribe` events will be invoked automatically when the underlying -connection is lost. This gives you control over re-subscribing to the channels -and patterns as appropriate. +When the underlying connection is lost, the `unsubscribe` and `punsubscribe` events +will be invoked automatically. This gives you control over re-subscribing to the +channels and patterns as appropriate. ## API -### Factory - -The `Factory` is responsible for creating your [`Client`](#client) instance. - -```php -$factory = new Clue\React\Redis\Factory(); -``` - -This class takes an optional `LoopInterface|null $loop` parameter that can be used to -pass the event loop instance to use for this object. You can use a `null` value -here in order to use the [default loop](https://github.com/reactphp/event-loop#loop). -This value SHOULD NOT be given unless you're sure you want to explicitly use a -given event loop instance. - -If you need custom connector settings (DNS resolution, TLS parameters, timeouts, -proxy servers etc.), you can explicitly pass a custom instance of the -[`ConnectorInterface`](https://github.com/reactphp/socket#connectorinterface): - -```php -$connector = new React\Socket\Connector([ - 'dns' => '127.0.0.1', - 'tcp' => [ - 'bindto' => '192.168.10.1:0' - ], - 'tls' => [ - 'verify_peer' => false, - 'verify_peer_name' => false - ] -]); - -$factory = new Clue\React\Redis\Factory(null, $connector); -``` - -#### createClient() - -The `createClient(string $uri): PromiseInterface` method can be used to -create a new [`Client`](#client). +### RedisClient -It helps with establishing a plain TCP/IP or secure TLS connection to Redis -and optionally authenticating (AUTH) and selecting the right database (SELECT). - -```php -$factory->createClient('localhost:6379')->then( - function (Client $redis) { - // client connected (and authenticated) - }, - function (Exception $e) { - // an error occurred while trying to connect (or authenticate) client - } -); -``` - -The method returns a [Promise](https://github.com/reactphp/promise) that -will resolve with a [`Client`](#client) -instance on success or will reject with an `Exception` if the URL is -invalid or the connection or authentication fails. - -The returned Promise is implemented in such a way that it can be -cancelled when it is still pending. Cancelling a pending promise will -reject its value with an Exception and will cancel the underlying TCP/IP -connection attempt and/or Redis authentication. - -```php -$promise = $factory->createClient($uri); - -Loop::addTimer(3.0, function () use ($promise) { - $promise->cancel(); -}); -``` - -The `$redisUri` can be given in the -[standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form -`[redis[s]://][:auth@]host[:port][/db]`. -You can omit the URI scheme and port if you're connecting to the default port 6379: - -```php -// both are equivalent due to defaults being applied -$factory->createClient('localhost'); -$factory->createClient('redis://localhost:6379'); -``` - -Redis supports password-based authentication (`AUTH` command). Note that Redis' -authentication mechanism does not employ a username, so you can pass the -password `h@llo` URL-encoded (percent-encoded) as part of the URI like this: - -```php -// all forms are equivalent -$factory->createClient('redis://:h%40llo@localhost'); -$factory->createClient('redis://ignored:h%40llo@localhost'); -$factory->createClient('redis://localhost?password=h%40llo'); -``` - -You can optionally include a path that will be used to select (SELECT command) the right database: - -```php -// both forms are equivalent -$factory->createClient('redis://localhost/2'); -$factory->createClient('redis://localhost?db=2'); -``` - -You can use the [standard](https://www.iana.org/assignments/uri-schemes/prov/rediss) -`rediss://` URI scheme if you're using a secure TLS proxy in front of Redis: - -```php -$factory->createClient('rediss://redis.example.com:6340'); -``` - -You can use the `redis+unix://` URI scheme if your Redis instance is listening -on a Unix domain socket (UDS) path: - -```php -$factory->createClient('redis+unix:///tmp/redis.sock'); - -// the URI MAY contain `password` and `db` query parameters as seen above -$factory->createClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); - -// the URI MAY contain authentication details as userinfo as seen above -// should be used with care, also note that database can not be passed as path -$factory->createClient('redis+unix://:secret@/tmp/redis.sock'); -``` - -This method respects PHP's `default_socket_timeout` setting (default 60s) -as a timeout for establishing the connection and waiting for successful -authentication. You can explicitly pass a custom timeout value in seconds -(or use a negative number to not apply a timeout) like this: - -```php -$factory->createClient('localhost?timeout=0.5'); -``` - -#### createLazyClient() - -The `createLazyClient(string $uri): Client` method can be used to -create a new [`Client`](#client). - -It helps with establishing a plain TCP/IP or secure TLS connection to Redis -and optionally authenticating (AUTH) and selecting the right database (SELECT). +The `RedisClient` is responsible for exchanging messages with your Redis server +and keeps track of pending commands. ```php -$redis = $factory->createLazyClient('localhost:6379'); +$redis = new Clue\React\Redis\RedisClient('localhost:6379'); $redis->incr('hello'); $redis->end(); ``` -This method immediately returns a "virtual" connection implementing the -[`Client`](#client) that can be used to interface with your Redis database. -Internally, it lazily creates the underlying database connection only on +Besides defining a few methods, this interface also implements the +`EventEmitterInterface` which allows you to react to certain events as documented below. + +Internally, this class creates the underlying database connection only on demand once the first request is invoked on this instance and will queue all outstanding requests until the underlying connection is ready. Additionally, it will only keep this underlying connection in an "idle" state @@ -428,9 +294,6 @@ database right away while the underlying connection may still be outstanding. Because creating this underlying connection may take some time, it will enqueue all oustanding commands and will ensure that all commands will be executed in correct order once the connection is ready. -In other words, this "virtual" connection behaves just like a "real" -connection as described in the `Client` interface and frees you from having -to deal with its async resolution. If the underlying database connection fails, it will reject all outstanding commands and will return to the initial "idle" state. This @@ -450,24 +313,25 @@ creating a new underlying connection repeating the above commands again. Note that creating the underlying connection will be deferred until the first request is invoked. Accordingly, any eventual connection issues will be detected once this instance is first used. You can use the -`end()` method to ensure that the "virtual" connection will be soft-closed +`end()` method to ensure that the connection will be soft-closed and no further commands can be enqueued. Similarly, calling `end()` on this instance when not currently connected will succeed immediately and will not have to wait for an actual underlying connection. -Depending on your particular use case, you may prefer this method or the -underlying `createClient()` which resolves with a promise. For many -simple use cases it may be easier to create a lazy connection. +#### __construct() + +The `new RedisClient(string $url, ConnectorInterface $connector = null, LoopInterface $loop = null)` constructor can be used to +create a new `RedisClient` instance. -The `$redisUri` can be given in the +The `$url` can be given in the [standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form `[redis[s]://][:auth@]host[:port][/db]`. You can omit the URI scheme and port if you're connecting to the default port 6379: ```php // both are equivalent due to defaults being applied -$factory->createLazyClient('localhost'); -$factory->createLazyClient('redis://localhost:6379'); +$redis = new Clue\React\Redis\RedisClient('localhost'); +$redis = new Clue\React\Redis\RedisClient('redis://localhost:6379'); ``` Redis supports password-based authentication (`AUTH` command). Note that Redis' @@ -476,38 +340,38 @@ password `h@llo` URL-encoded (percent-encoded) as part of the URI like this: ```php // all forms are equivalent -$factory->createLazyClient('redis://:h%40llo@localhost'); -$factory->createLazyClient('redis://ignored:h%40llo@localhost'); -$factory->createLazyClient('redis://localhost?password=h%40llo'); +$redis = new Clue\React\Redis\RedisClient('redis://:h%40llo@localhost'); +$redis = new Clue\React\Redis\RedisClient('redis://ignored:h%40llo@localhost'); +$redis = new Clue\React\Redis\RedisClient('redis://localhost?password=h%40llo'); ``` You can optionally include a path that will be used to select (SELECT command) the right database: ```php // both forms are equivalent -$factory->createLazyClient('redis://localhost/2'); -$factory->createLazyClient('redis://localhost?db=2'); +$redis = new Clue\React\Redis\RedisClient('redis://localhost/2'); +$redis = new Clue\React\Redis\RedisClient('redis://localhost?db=2'); ``` You can use the [standard](https://www.iana.org/assignments/uri-schemes/prov/rediss) `rediss://` URI scheme if you're using a secure TLS proxy in front of Redis: ```php -$factory->createLazyClient('rediss://redis.example.com:6340'); +$redis = new Clue\React\Redis\RedisClient('rediss://redis.example.com:6340'); ``` You can use the `redis+unix://` URI scheme if your Redis instance is listening on a Unix domain socket (UDS) path: ```php -$factory->createLazyClient('redis+unix:///tmp/redis.sock'); +$redis = new Clue\React\Redis\RedisClient('redis+unix:///tmp/redis.sock'); // the URI MAY contain `password` and `db` query parameters as seen above -$factory->createLazyClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); +$redis = new Clue\React\Redis\RedisClient('redis+unix:///tmp/redis.sock?password=secret&db=2'); // the URI MAY contain authentication details as userinfo as seen above // should be used with care, also note that database can not be passed as path -$factory->createLazyClient('redis+unix://:secret@/tmp/redis.sock'); +$redis = new Clue\React\Redis\RedisClient('redis+unix://:secret@/tmp/redis.sock'); ``` This method respects PHP's `default_socket_timeout` setting (default 60s) @@ -516,7 +380,7 @@ successful authentication. You can explicitly pass a custom timeout value in seconds (or use a negative number to not apply a timeout) like this: ```php -$factory->createLazyClient('localhost?timeout=0.5'); +$redis = new Clue\React\Redis\RedisClient('localhost?timeout=0.5'); ``` By default, this method will keep "idle" connections open for 60s and will @@ -529,16 +393,32 @@ idle timeout value in seconds (or use a negative number to not apply a timeout) like this: ```php -$factory->createLazyClient('localhost?idle=0.1'); +$redis = new Clue\React\Redis\RedisClient('localhost?idle=0.1'); ``` -### Client +If you need custom DNS, proxy or TLS settings, you can explicitly pass a +custom instance of the [`ConnectorInterface`](https://github.com/reactphp/socket#connectorinterface): -The `Client` is responsible for exchanging messages with Redis -and keeps track of pending commands. +```php +$connector = new React\Socket\Connector([ + 'dns' => '127.0.0.1', + 'tcp' => [ + 'bindto' => '192.168.10.1:0' + ], + 'tls' => [ + 'verify_peer' => false, + 'verify_peer_name' => false + ] +]); -Besides defining a few methods, this interface also implements the -`EventEmitterInterface` which allows you to react to certain events as documented below. +$redis = new Clue\React\Redis\RedisClient('localhost', $connector); +``` + +This class takes an optional `LoopInterface|null $loop` parameter that can be used to +pass the event loop instance to use for this object. You can use a `null` value +here in order to use the [default loop](https://github.com/reactphp/event-loop#loop). +This value SHOULD NOT be given unless you're sure you want to explicitly use a +given event loop instance. #### __call() diff --git a/examples/cli.php b/examples/cli.php index d0b41f8..0eb8b65 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -7,48 +7,42 @@ require __DIR__ . '/../vendor/autoload.php'; -$factory = new Clue\React\Redis\Factory(); - -echo '# connecting to redis...' . PHP_EOL; - -$factory->createClient(getenv('REDIS_URI') ?: 'localhost:6379')->then(function (Clue\React\Redis\Client $redis) { - echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL; - - Loop::addReadStream(STDIN, function () use ($redis) { - $line = fgets(STDIN); - if ($line === false || $line === '') { - echo '# CTRL-D -> Ending connection...' . PHP_EOL; - Loop::removeReadStream(STDIN); - return $redis->end(); - } - - $line = rtrim($line); - if ($line === '') { - return; - } - - $params = explode(' ', $line); - $method = array_shift($params); - $promise = call_user_func_array([$redis, $method], $params); - - // special method such as end() / close() called - if (!$promise instanceof React\Promise\PromiseInterface) { - return; - } - - $promise->then(function ($data) { - echo '# reply: ' . json_encode($data) . PHP_EOL; - }, function (Exception $e) { - echo '# error reply: ' . $e->getMessage() . PHP_EOL; - }); - }); - - $redis->on('close', function() { - echo '## DISCONNECTED' . PHP_EOL; +$redis = new Clue\React\Redis\RedisClient(getenv('REDIS_URI') ?: 'localhost:6379'); +Loop::addReadStream(STDIN, function () use ($redis) { + $line = fgets(STDIN); + if ($line === false || $line === '') { + echo '# CTRL-D -> Ending connection...' . PHP_EOL; Loop::removeReadStream(STDIN); + $redis->end(); + return; + } + + $line = rtrim($line); + if ($line === '') { + return; + } + + $params = explode(' ', $line); + $method = array_shift($params); + $promise = call_user_func_array([$redis, $method], $params); + + // special method such as end() / close() called + if (!$promise instanceof React\Promise\PromiseInterface) { + return; + } + + $promise->then(function ($data) { + echo '# reply: ' . json_encode($data) . PHP_EOL; + }, function ($e) { + echo '# error reply: ' . $e->getMessage() . PHP_EOL; }); -}, function (Exception $e) { - echo 'Error: ' . $e->getMessage() . PHP_EOL; - exit(1); }); + +$redis->on('close', function() { + echo '## DISCONNECTED' . PHP_EOL; + + Loop::removeReadStream(STDIN); +}); + +echo '# Entering interactive mode ready, hit CTRL-D to quit' . PHP_EOL; diff --git a/examples/incr.php b/examples/incr.php index 1d28524..6744da5 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -5,8 +5,7 @@ require __DIR__ . '/../vendor/autoload.php'; -$factory = new Clue\React\Redis\Factory(); -$redis = $factory->createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); +$redis = new Clue\React\Redis\RedisClient(getenv('REDIS_URI') ?: 'localhost:6379'); $redis->incr('test'); @@ -17,4 +16,4 @@ exit(1); }); -//$redis->end(); +$redis->end(); diff --git a/examples/publish.php b/examples/publish.php index 90e6a48..06bcd9b 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -5,12 +5,11 @@ require __DIR__ . '/../vendor/autoload.php'; -$factory = new Clue\React\Redis\Factory(); -$redis = $factory->createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); - $channel = $argv[1] ?? 'channel'; $message = $argv[2] ?? 'message'; +$redis = new Clue\React\Redis\RedisClient(getenv('REDIS_URI') ?: 'localhost:6379'); + $redis->publish($channel, $message)->then(function (int $received) { echo 'Successfully published. Received by ' . $received . PHP_EOL; }, function (Exception $e) { diff --git a/examples/subscribe.php b/examples/subscribe.php index 1270b7c..673b565 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -7,11 +7,10 @@ require __DIR__ . '/../vendor/autoload.php'; -$factory = new Clue\React\Redis\Factory(); -$redis = $factory->createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); - $channel = $argv[1] ?? 'channel'; +$redis = new Clue\React\Redis\RedisClient(getenv('REDIS_URI') ?: 'localhost:6379'); + $redis->subscribe($channel)->then(function () { echo 'Now subscribed to channel ' . PHP_EOL; }, function (Exception $e) use ($redis) { diff --git a/src/Client.php b/src/Client.php deleted file mode 100644 index 714ac88..0000000 --- a/src/Client.php +++ /dev/null @@ -1,54 +0,0 @@ - - */ - public function __call(string $name, array $args): PromiseInterface; - - /** - * end connection once all pending requests have been replied to - * - * @return void - * @uses self::close() once all replies have been received - * @see self::close() for closing the connection immediately - */ - public function end(): void; - - /** - * close connection immediately - * - * This will emit the "close" event. - * - * @return void - * @see self::end() for closing the connection once the client is idle - */ - public function close(): void; -} diff --git a/src/Factory.php b/src/Io/Factory.php similarity index 93% rename from src/Factory.php rename to src/Io/Factory.php index 3b77c92..0f88825 100644 --- a/src/Factory.php +++ b/src/Io/Factory.php @@ -1,6 +1,6 @@ Promise that will - * be fulfilled with `Client` on success or rejects with `\Exception` on error. + * @return PromiseInterface Promise that will + * be fulfilled with `StreamingClient` on success or rejects with `\Exception` on error. */ public function createClient(string $uri): PromiseInterface { @@ -178,15 +181,4 @@ function (\Exception $e) use ($redis, $uri) { throw $e; }); } - - /** - * Create Redis client connected to address of given redis instance - * - * @param string $target - * @return Client - */ - public function createLazyClient($target): Client - { - return new LazyClient($target, $this, $this->loop); - } } diff --git a/src/StreamingClient.php b/src/Io/StreamingClient.php similarity index 98% rename from src/StreamingClient.php rename to src/Io/StreamingClient.php index 91467d6..126bff5 100644 --- a/src/StreamingClient.php +++ b/src/Io/StreamingClient.php @@ -1,6 +1,6 @@ */ private $psubscribed = []; - public function __construct(string $target, Factory $factory, LoopInterface $loop) + /** + * @param string $url + * @param ?ConnectorInterface $connector + * @param ?LoopInterface $loop + */ + public function __construct($url, ConnectorInterface $connector = null, LoopInterface $loop = null) { $args = []; - \parse_str((string) \parse_url($target, \PHP_URL_QUERY), $args); + \parse_str((string) \parse_url($url, \PHP_URL_QUERY), $args); if (isset($args['idle'])) { $this->idlePeriod = (float)$args['idle']; } - $this->target = $target; - $this->factory = $factory; - $this->loop = $loop; + $this->target = $url; + $this->loop = $loop ?: Loop::get(); + $this->factory = new Factory($this->loop, $connector); } private function client(): PromiseInterface @@ -63,7 +82,7 @@ private function client(): PromiseInterface return $this->promise; } - return $this->promise = $this->factory->createClient($this->target)->then(function (Client $redis) { + return $this->promise = $this->factory->createClient($this->target)->then(function (StreamingClient $redis) { // connection completed => remember only until closed $redis->on('close', function () { $this->promise = null; @@ -121,6 +140,16 @@ private function client(): PromiseInterface }); } + /** + * Invoke the given command and return a Promise that will be resolved when the request has been replied to + * + * This is a magic method that will be invoked when calling any redis + * command on this instance. + * + * @param string $name + * @param string[] $args + * @return PromiseInterface Promise + */ public function __call(string $name, array $args): PromiseInterface { if ($this->closed) { @@ -130,7 +159,7 @@ public function __call(string $name, array $args): PromiseInterface )); } - return $this->client()->then(function (Client $redis) use ($name, $args) { + return $this->client()->then(function (StreamingClient $redis) use ($name, $args) { $this->awake(); return \call_user_func_array([$redis, $name], $args)->then( function ($result) { @@ -145,6 +174,13 @@ function (\Exception $error) { }); } + /** + * end connection once all pending requests have been replied to + * + * @return void + * @uses self::close() once all replies have been received + * @see self::close() for closing the connection immediately + */ public function end(): void { if ($this->promise === null) { @@ -155,7 +191,7 @@ public function end(): void return; } - $this->client()->then(function (Client $redis) { + $this->client()->then(function (StreamingClient $redis) { $redis->on('close', function () { $this->close(); }); @@ -163,6 +199,14 @@ public function end(): void }); } + /** + * close connection immediately + * + * This will emit the "close" event. + * + * @return void + * @see self::end() for closing the connection once the client is idle + */ public function close(): void { if ($this->closed) { @@ -173,7 +217,7 @@ public function close(): void // either close active connection or cancel pending connection attempt if ($this->promise !== null) { - $this->promise->then(function (Client $redis) { + $this->promise->then(function (StreamingClient $redis) { $redis->close(); }); if ($this->promise !== null) { @@ -207,7 +251,7 @@ private function idle(): void if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed && $this->promise !== null) { $this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () { - $this->promise->then(function (Client $redis) { + $this->promise->then(function (StreamingClient $redis) { $redis->close(); }); $this->promise = null; diff --git a/tests/FactoryLazyClientTest.php b/tests/FactoryLazyClientTest.php deleted file mode 100644 index 5b8ef60..0000000 --- a/tests/FactoryLazyClientTest.php +++ /dev/null @@ -1,170 +0,0 @@ -loop = $this->createMock(LoopInterface::class); - $this->connector = $this->createMock(ConnectorInterface::class); - $this->factory = new Factory($this->loop, $this->connector); - } - - public function testConstructWithoutLoopAssignsLoopAutomatically() - { - $factory = new Factory(); - - $ref = new \ReflectionProperty($factory, 'loop'); - $ref->setAccessible(true); - $loop = $ref->getValue($factory); - - $this->assertInstanceOf(LoopInterface::class, $loop); - } - - public function testWillConnectWithDefaultPort() - { - $this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(reject(new \RuntimeException())); - $this->factory->createLazyClient('redis.example.com'); - } - - public function testWillConnectToLocalhost() - { - $this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(reject(new \RuntimeException())); - $this->factory->createLazyClient('localhost:1337'); - } - - public function testWillResolveIfConnectorResolves() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write'); - - $this->connector->expects($this->never())->method('connect')->willReturn(resolve($stream)); - $redis = $this->factory->createLazyClient('localhost'); - - $this->assertInstanceOf(Client::class, $redis); - } - - public function testWillWriteSelectCommandIfTargetContainsPath() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); - - $this->connector->expects($this->never())->method('connect')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://127.0.0.1/demo'); - } - - public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n"); - - $this->connector->expects($this->never())->method('connect')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://127.0.0.1?db=4'); - } - - public function testWillWriteAuthCommandIfRedisUriContainsUserInfo() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://hello:world@example.com'); - } - - public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://:h%40llo@example.com'); - } - - public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://example.com?password=secret'); - } - - public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis://example.com?password=h%40llo'); - } - - public function testWillWriteAuthCommandIfRedissUriContainsUserInfo() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(resolve($stream)); - $this->factory->createLazyClient('rediss://hello:world@example.com'); - } - - public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world'); - } - - public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock'); - } - - public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter() - { - $stream = $this->createMock(ConnectionInterface::class); - $stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); - - $this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(resolve($stream)); - $this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo'); - } - - public function testWillRejectIfConnectorRejects() - { - $this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(reject(new \RuntimeException())); - $redis = $this->factory->createLazyClient('redis://127.0.0.1:2'); - - $this->assertInstanceOf(Client::class, $redis); - } - - public function testWillRejectIfTargetIsInvalid() - { - $redis = $this->factory->createLazyClient('http://invalid target'); - - $this->assertInstanceOf(Client::class, $redis); - } -} diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index 22586c2..c81cd85 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -2,13 +2,10 @@ namespace Clue\Tests\React\Redis; -use Clue\React\Redis\Client; -use Clue\React\Redis\Factory; -use Clue\React\Redis\StreamingClient; +use Clue\React\Redis\RedisClient; use React\EventLoop\StreamSelectLoop; use React\Promise\Deferred; use React\Promise\PromiseInterface; -use React\Stream\DuplexResourceStream; use function Clue\React\Block\await; class FunctionalTest extends TestCase @@ -16,9 +13,6 @@ class FunctionalTest extends TestCase /** @var StreamSelectLoop */ private $loop; - /** @var Factory */ - private $factory; - /** @var string */ private $uri; @@ -30,12 +24,11 @@ public function setUp(): void } $this->loop = new StreamSelectLoop(); - $this->factory = new Factory($this->loop); } public function testPing() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $promise = $redis->ping(); $this->assertInstanceOf(PromiseInterface::class, $promise); @@ -47,7 +40,7 @@ public function testPing() public function testPingLazy() { - $redis = $this->factory->createLazyClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $promise = $redis->ping(); $this->assertInstanceOf(PromiseInterface::class, $promise); @@ -62,7 +55,7 @@ public function testPingLazy() */ public function testPingLazyWillNotBlockLoopWhenIdleTimeIsSmall() { - $redis = $this->factory->createLazyClient($this->uri . '?idle=0'); + $redis = new RedisClient($this->uri . '?idle=0', null, $this->loop); $redis->ping(); @@ -74,7 +67,7 @@ public function testPingLazyWillNotBlockLoopWhenIdleTimeIsSmall() */ public function testLazyClientWithoutCommandsWillNotBlockLoop() { - $redis = $this->factory->createLazyClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $this->loop->run(); @@ -83,7 +76,7 @@ public function testLazyClientWithoutCommandsWillNotBlockLoop() public function testMgetIsNotInterpretedAsSubMessage() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->mset('message', 'message', 'channel', 'channel', 'payload', 'payload'); @@ -95,7 +88,7 @@ public function testMgetIsNotInterpretedAsSubMessage() public function testPipeline() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->set('a', 1)->then($this->expectCallableOnceWith('OK')); $redis->incr('a')->then($this->expectCallableOnceWith(2)); @@ -107,7 +100,7 @@ public function testPipeline() public function testInvalidCommand() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $promise = $redis->doesnotexist(1, 2, 3); if (method_exists($this, 'expectException')) { @@ -120,7 +113,7 @@ public function testInvalidCommand() public function testMultiExecEmpty() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->multi()->then($this->expectCallableOnceWith('OK')); $promise = $redis->exec()->then($this->expectCallableOnceWith([])); @@ -129,7 +122,7 @@ public function testMultiExecEmpty() public function testMultiExecQueuedExecHasValues() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->multi()->then($this->expectCallableOnceWith('OK')); $redis->set('b', 10)->then($this->expectCallableOnceWith('QUEUED')); @@ -143,8 +136,8 @@ public function testMultiExecQueuedExecHasValues() public function testPubSub() { - $consumer = $this->createClient($this->uri); - $producer = $this->createClient($this->uri); + $consumer = new RedisClient($this->uri, null, $this->loop); + $producer = new RedisClient($this->uri, null, $this->loop); $channel = 'channel:test:' . mt_rand(); @@ -164,7 +157,7 @@ public function testPubSub() public function testClose() { - $redis = $this->createClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); @@ -175,7 +168,7 @@ public function testClose() public function testCloseLazy() { - $redis = $this->factory->createLazyClient($this->uri); + $redis = new RedisClient($this->uri, null, $this->loop); $redis->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); @@ -183,49 +176,4 @@ public function testCloseLazy() $redis->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); } - - public function testInvalidProtocol() - { - $redis = $this->createClientResponse("communication does not conform to protocol\r\n"); - - $redis->on('error', $this->expectCallableOnce()); - $redis->on('close', $this->expectCallableOnce()); - - $promise = $redis->get('willBeRejectedDueToClosing'); - - $this->expectException(\Exception::class); - await($promise, $this->loop); - } - - public function testInvalidServerRepliesWithDuplicateMessages() - { - $redis = $this->createClientResponse("+OK\r\n-ERR invalid\r\n"); - - $redis->on('error', $this->expectCallableOnce()); - $redis->on('close', $this->expectCallableOnce()); - - $promise = $redis->set('a', 0)->then($this->expectCallableOnceWith('OK')); - - await($promise, $this->loop); - } - - /** - * @param string $uri - * @return Client - */ - protected function createClient($uri) - { - return await($this->factory->createClient($uri), $this->loop); - } - - protected function createClientResponse($response) - { - $fp = fopen('php://temp', 'r+'); - fwrite($fp, $response); - fseek($fp, 0); - - $stream = new DuplexResourceStream($fp, $this->loop); - - return new StreamingClient($stream); - } } diff --git a/tests/FactoryStreamingClientTest.php b/tests/Io/FactoryStreamingClientTest.php similarity index 99% rename from tests/FactoryStreamingClientTest.php rename to tests/Io/FactoryStreamingClientTest.php index a490cb3..9941b07 100644 --- a/tests/FactoryStreamingClientTest.php +++ b/tests/Io/FactoryStreamingClientTest.php @@ -1,9 +1,10 @@ assertTrue(is_callable($dataHandler)); $dataHandler("+OK\r\n"); - $promise->then($this->expectCallableOnceWith($this->isInstanceOf(Client::class))); + $promise->then($this->expectCallableOnceWith($this->isInstanceOf(StreamingClient::class))); } public function testWillRejectAndCloseAutomaticallyWhenAuthCommandReceivesErrorResponseIfRedisUriContainsUserInfo() @@ -286,7 +287,7 @@ public function testWillResolveWhenSelectCommandReceivesOkResponseIfRedisUriCont $this->assertTrue(is_callable($dataHandler)); $dataHandler("+OK\r\n"); - $promise->then($this->expectCallableOnceWith($this->isInstanceOf(Client::class))); + $promise->then($this->expectCallableOnceWith($this->isInstanceOf(StreamingClient::class))); } public function testWillRejectAndCloseAutomaticallyWhenSelectCommandReceivesErrorResponseIfRedisUriContainsPath() diff --git a/tests/StreamingClientTest.php b/tests/Io/StreamingClientTest.php similarity index 97% rename from tests/StreamingClientTest.php rename to tests/Io/StreamingClientTest.php index bed0ebf..b0274f8 100644 --- a/tests/StreamingClientTest.php +++ b/tests/Io/StreamingClientTest.php @@ -1,7 +1,8 @@ psubscribe('demo_*'); $this->expectPromiseResolve($promise); @@ -321,9 +321,9 @@ public function testPubsubPatternSubscribe(Client $client) /** * @depends testPubsubPatternSubscribe - * @param Client $client + * @param StreamingClient $client */ - public function testPubsubMessage(Client $client) + public function testPubsubMessage(StreamingClient $client) { $client->on('message', $this->expectCallableOnce()); $client->handleMessage(new MultiBulkReply([new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')])); diff --git a/tests/LazyClientTest.php b/tests/RedisClientTest.php similarity index 91% rename from tests/LazyClientTest.php rename to tests/RedisClientTest.php index fd61c6e..c1f8d65 100644 --- a/tests/LazyClientTest.php +++ b/tests/RedisClientTest.php @@ -2,16 +2,16 @@ namespace Clue\Tests\React\Redis; -use Clue\React\Redis\Client; -use Clue\React\Redis\Factory; -use Clue\React\Redis\LazyClient; +use Clue\React\Redis\RedisClient; +use Clue\React\Redis\Io\Factory; +use Clue\React\Redis\Io\StreamingClient; use PHPUnit\Framework\MockObject\MockObject; use React\EventLoop\LoopInterface; use React\EventLoop\TimerInterface; -use React\Promise\Promise; use React\Promise\Deferred; +use React\Promise\Promise; -class LazyClientTest extends TestCase +class RedisClientTest extends TestCase { /** @var MockObject */ private $factory; @@ -19,7 +19,7 @@ class LazyClientTest extends TestCase /** @var MockObject */ private $loop; - /** @var LazyClient */ + /** @var RedisClient */ private $redis; public function setUp(): void @@ -27,7 +27,11 @@ public function setUp(): void $this->factory = $this->createMock(Factory::class); $this->loop = $this->createMock(LoopInterface::class); - $this->redis = new LazyClient('localhost', $this->factory, $this->loop); + $this->redis = new RedisClient('localhost', null, $this->loop); + + $ref = new \ReflectionProperty($this->redis, 'factory'); + $ref->setAccessible(true); + $ref->setValue($this->redis, $this->factory); } public function testPingWillCreateUnderlyingClientAndReturnPendingPromise() @@ -53,7 +57,7 @@ public function testPingTwiceWillCreateOnceUnderlyingClient() public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimer() { - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); @@ -69,9 +73,13 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleT public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimerWithIdleTimeFromQueryParam() { - $this->redis = new LazyClient('localhost?idle=10', $this->factory, $this->loop); + $this->redis = new RedisClient('localhost?idle=10', null, $this->loop); - $client = $this->createMock(Client::class); + $ref = new \ReflectionProperty($this->redis, 'factory'); + $ref->setAccessible(true); + $ref->setValue($this->redis, $this->factory); + + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); @@ -87,9 +95,13 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleT public function testPingWillResolveWhenUnderlyingClientResolvesPingAndNotStartIdleTimerWhenIdleParamIsNegative() { - $this->redis = new LazyClient('localhost?idle=-1', $this->factory, $this->loop); + $this->redis = new RedisClient('localhost?idle=-1', null, $this->loop); + + $ref = new \ReflectionProperty($this->redis, 'factory'); + $ref->setAccessible(true); + $ref->setValue($this->redis, $this->factory); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); @@ -106,7 +118,7 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPingAndNotStartId public function testPingWillRejectWhenUnderlyingClientRejectsPingAndStartIdleTimer() { $error = new \RuntimeException(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\reject($error)); $deferred = new Deferred(); @@ -155,7 +167,7 @@ public function testPingAfterPreviousFactoryRejectsUnderlyingClientWillCreateNew public function testPingAfterPreviousUnderlyingClientAlreadyClosedWillCreateNewUnderlyingConnection() { $closeHandler = null; - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->any())->method('on')->withConsecutive( ['close', $this->callback(function ($arg) use (&$closeHandler) { @@ -199,7 +211,7 @@ public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection( public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves() { $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( $deferred->promise(), new Promise(function () { }) @@ -217,7 +229,7 @@ public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves() public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStartsAfterFirstResolves() { $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( $deferred->promise(), new Promise(function () { }) @@ -236,7 +248,7 @@ public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStarts public function testPingFollowedByIdleTimerWillCloseUnderlyingConnectionWithoutCloseEvent() { - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $client->expects($this->once())->method('close'); @@ -299,7 +311,7 @@ public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientC public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved() { - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $client->expects($this->once())->method('close'); @@ -314,7 +326,7 @@ public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlready public function testCloseAfterPingWillCancelIdleTimerWhenPingIsAlreadyResolved() { $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); $client->expects($this->once())->method('close'); @@ -332,7 +344,7 @@ public function testCloseAfterPingWillCancelIdleTimerWhenPingIsAlreadyResolved() public function testCloseAfterPingRejectsWillEmitClose() { $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); $client->expects($this->once())->method('close')->willReturnCallback(function () use ($client) { $client->emit('close'); @@ -360,7 +372,7 @@ public function testEndWillCloseClientIfUnderlyingConnectionIsNotPending() public function testEndAfterPingWillEndUnderlyingClient() { - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->once())->method('end'); @@ -375,7 +387,7 @@ public function testEndAfterPingWillEndUnderlyingClient() public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose() { $closeHandler = null; - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->once())->method('end'); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$closeHandler) { @@ -401,7 +413,7 @@ public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError() { $error = new \RuntimeException(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $deferred = new Deferred(); @@ -416,7 +428,7 @@ public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError() public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose() { - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $deferred = new Deferred(); @@ -432,7 +444,7 @@ public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose() public function testEmitsNoCloseEventButWillCancelIdleTimerWhenUnderlyingConnectionEmitsCloseAfterPingIsAlreadyResolved() { $closeHandler = null; - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $deferred = new Deferred(); $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->withConsecutive( @@ -460,7 +472,7 @@ public function testEmitsNoCloseEventButWillCancelIdleTimerWhenUnderlyingConnect public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel() { $messageHandler = null; - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$messageHandler) { if ($event === 'message') { @@ -482,7 +494,7 @@ public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubCh public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClosesWhileUsingPubSubChannel() { $allHandler = null; - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->exactly(6))->method('__call')->willReturn(\React\Promise\resolve()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$allHandler) { if (!isset($allHandler[$event])) { @@ -527,7 +539,7 @@ public function testSubscribeWillResolveWhenUnderlyingClientResolvesSubscribeAnd { $subscribeHandler = null; $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('subscribe')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$subscribeHandler) { if ($event === 'subscribe' && $subscribeHandler === null) { @@ -553,7 +565,7 @@ public function testUnsubscribeAfterSubscribeWillResolveWhenUnderlyingClientReso $unsubscribeHandler = null; $deferredSubscribe = new Deferred(); $deferredUnsubscribe = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$subscribeHandler, &$unsubscribeHandler) { if ($event === 'subscribe' && $subscribeHandler === null) { @@ -585,7 +597,7 @@ public function testBlpopWillRejectWhenUnderlyingClientClosesWhileWaitingForResp { $closeHandler = null; $deferred = new Deferred(); - $client = $this->createMock(Client::class); + $client = $this->createMock(StreamingClient::class); $client->expects($this->once())->method('__call')->with('blpop')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->withConsecutive( ['close', $this->callback(function ($arg) use (&$closeHandler) {