diff --git a/README.md b/README.md index b04b423..b1c24fe 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ It enables you to set and query its data or use its PubSub topics to react to in * [RedisClient](#redisclient) * [__construct()](#__construct) * [__call()](#__call) + * [callAsync()](#callasync) * [end()](#end) * [close()](#close) * [error event](#error-event) @@ -124,7 +125,8 @@ Each method call matches the respective [Redis command](https://redis.io/command For example, the `$redis->get()` method will invoke the [`GET` command](https://redis.io/commands/get). All [Redis commands](https://redis.io/commands) are automatically available as -public methods via the magic [`__call()` method](#__call). +public methods via the magic [`__call()` method](#__call) or through the more +explicit [`callAsync()` method]. Listing all available commands is out of scope here, please refer to the [Redis command reference](https://redis.io/commands). @@ -432,6 +434,8 @@ $redis->get($key)->then(function (?string $value) { All [Redis commands](https://redis.io/commands) are automatically available as public methods via this magic `__call()` method. +Note that some static analysis tools may not understand this magic method, so +you may also the [`callAsync()` method](#callasync) as a more explicit alternative. Listing all available commands is out of scope here, please refer to the [Redis command reference](https://redis.io/commands). @@ -445,6 +449,43 @@ Each of these commands supports async operation and returns a [Promise](#promise that eventually *fulfills* with its *results* on success or *rejects* with an `Exception` on error. See also [promises](#promises) for more details. +#### callAsync() + +The `callAsync(string $command, string ...$args): PromiseInterface` method can be used to +invoke a Redis command. + +```php +$redis->callAsync('GET', 'name')->then(function (?string $name): void { + echo 'Name: ' . ($name ?? 'Unknown') . PHP_EOL; +}, function (Throwable $e): void { + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); +``` + +The `string $command` parameter can be any valid Redis command. All +[Redis commands](https://redis.io/commands/) are available through this +method. As an alternative, you may also use the magic +[`__call()` method](#__call), but note that not all static analysis tools +may understand this magic method. Listing all available commands is out +of scope here, please refer to the +[Redis command reference](https://redis.io/commands). + +The optional `string ...$args` parameter can be used to pass any +additional arguments to the Redis command. Some commands may require or +support additional arguments that this method will simply forward as is. +Internally, Redis requires all arguments to be coerced to `string` values, +but you may also rely on PHP's type-juggling semantics and pass `int` or +`float` values: + +```php +$redis->callAsync('SET', 'name', 'Alice', 'EX', 600); +``` + +This method supports async operation and returns a [Promise](#promises) +that eventually *fulfills* with its *results* on success or *rejects* +with an `Exception` on error. See also [promises](#promises) for more +details. + #### end() The `end():void` method can be used to diff --git a/examples/cli.php b/examples/cli.php index 23c9e55..195c737 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -23,20 +23,20 @@ return; } - $params = explode(' ', $line); - $method = array_shift($params); - - assert(is_callable([$redis, $method])); - $promise = $redis->$method(...$params); + $args = explode(' ', $line); + $command = strtolower(array_shift($args)); // special method such as end() / close() called - if (!$promise instanceof React\Promise\PromiseInterface) { + if (in_array($command, ['end', 'close'])) { + $redis->$command(); return; } - $promise->then(function ($data) { + $promise = $redis->callAsync($command, ...$args); + + $promise->then(function ($data): void { echo '# reply: ' . json_encode($data) . PHP_EOL; - }, function ($e) { + }, function (Throwable $e): void { echo '# error reply: ' . $e->getMessage() . PHP_EOL; }); }); diff --git a/phpstan.neon.dist b/phpstan.neon.dist index be4a66a..f61b9e6 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -9,4 +9,3 @@ parameters: ignoreErrors: # ignore undefined methods due to magic `__call()` method - '/^Call to an undefined method Clue\\React\\Redis\\RedisClient::.+\(\)\.$/' - - '/^Call to an undefined method Clue\\React\\Redis\\Io\\StreamingClient::.+\(\)\.$/' diff --git a/src/Io/Factory.php b/src/Io/Factory.php index b66e30d..5a3d692 100644 --- a/src/Io/Factory.php +++ b/src/Io/Factory.php @@ -96,12 +96,13 @@ public function createClient(string $uri): PromiseInterface // use `?password=secret` query or `user:secret@host` password form URL if (isset($args['password']) || isset($parts['pass'])) { $pass = $args['password'] ?? rawurldecode($parts['pass']); // @phpstan-ignore-line + \assert(\is_string($pass)); $promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) { - return $redis->auth($pass)->then( + return $redis->callAsync('auth', $pass)->then( function () use ($redis) { return $redis; }, - function (\Exception $e) use ($redis, $uri) { + function (\Throwable $e) use ($redis, $uri) { $redis->close(); $const = ''; @@ -124,12 +125,13 @@ function (\Exception $e) use ($redis, $uri) { // use `?db=1` query or `/1` path (skip first slash) if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) { $db = $args['db'] ?? substr($parts['path'], 1); // @phpstan-ignore-line + \assert(\is_string($db)); $promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) { - return $redis->select($db)->then( + return $redis->callAsync('select', $db)->then( function () use ($redis) { return $redis; }, - function (\Exception $e) use ($redis, $uri) { + function (\Throwable $e) use ($redis, $uri) { $redis->close(); $const = ''; diff --git a/src/Io/StreamingClient.php b/src/Io/StreamingClient.php index bb7689f..0d5b2f7 100644 --- a/src/Io/StreamingClient.php +++ b/src/Io/StreamingClient.php @@ -72,15 +72,14 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars } /** - * @param string[] $args * @return PromiseInterface */ - public function __call(string $name, array $args): PromiseInterface + public function callAsync(string $command, string ...$args): PromiseInterface { $request = new Deferred(); $promise = $request->promise(); - $name = strtolower($name); + $command = strtolower($command); // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied static $pubsubs = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe']; @@ -90,22 +89,22 @@ public function __call(string $name, array $args): PromiseInterface 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)', defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 )); - } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { + } elseif (count($args) !== 1 && in_array($command, $pubsubs)) { $request->reject(new \InvalidArgumentException( 'PubSub commands limited to single argument (EINVAL)', defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 )); - } elseif ($name === 'monitor') { + } elseif ($command === 'monitor') { $request->reject(new \BadMethodCallException( 'MONITOR command explicitly not supported (ENOTSUP)', defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95) )); } else { - $this->stream->write($this->serializer->getRequestMessage($name, $args)); + $this->stream->write($this->serializer->getRequestMessage($command, $args)); $this->requests []= $request; } - if (in_array($name, $pubsubs)) { + if (in_array($command, $pubsubs)) { $promise->then(function (array $array) { $first = array_shift($array); diff --git a/src/RedisClient.php b/src/RedisClient.php index df4ebf5..29d4854 100644 --- a/src/RedisClient.php +++ b/src/RedisClient.php @@ -160,16 +160,65 @@ private function client(): PromiseInterface } /** - * Invoke the given command and return a Promise that will be resolved when the request has been replied to + * Invoke the given command and return a Promise that will be resolved when the command has been replied to * * This is a magic method that will be invoked when calling any redis - * command on this instance. + * command on this instance. See also `RedisClient::callAsync()`. * * @param string $name * @param string[] $args * @return PromiseInterface + * @see self::callAsync() */ public function __call(string $name, array $args): PromiseInterface + { + return $this->callAsync($name, ...$args); + } + + /** + * Invoke a Redis command. + * + * For example, the [`GET` command](https://redis.io/commands/get) can be invoked + * like this: + * + * ```php + * $redis->callAsync('GET', 'name')->then(function (?string $name): void { + * echo 'Name: ' . ($name ?? 'Unknown') . PHP_EOL; + * }, function (Throwable $e): void { + * echo 'Error: ' . $e->getMessage() . PHP_EOL; + * }); + * ``` + * + * The `string $command` parameter can be any valid Redis command. All + * [Redis commands](https://redis.io/commands/) are available through this + * method. As an alternative, you may also use the magic + * [`__call()` method](#__call), but note that not all static analysis tools + * may understand this magic method. Listing all available commands is out + * of scope here, please refer to the + * [Redis command reference](https://redis.io/commands). + * + * The optional `string ...$args` parameter can be used to pass any + * additional arguments to the Redis command. Some commands may require or + * support additional arguments that this method will simply forward as is. + * Internally, Redis requires all arguments to be coerced to `string` values, + * but you may also rely on PHP's type-juggling semantics and pass `int` or + * `float` values: + * + * ```php + * $redis->callAsync('SET', 'name', 'Alice', 'EX', 600); + * ``` + * + * This method supports async operation and returns a [Promise](#promises) + * that eventually *fulfills* with its *results* on success or *rejects* + * with an `Exception` on error. See also [promises](#promises) for more + * details. + * + * @param string $command + * @param string ...$args + * @return PromiseInterface + * @throws void + */ + public function callAsync(string $command, string ...$args): PromiseInterface { if ($this->closed) { return reject(new \RuntimeException( @@ -178,17 +227,17 @@ public function __call(string $name, array $args): PromiseInterface )); } - return $this->client()->then(function (StreamingClient $redis) use ($name, $args) { + return $this->client()->then(function (StreamingClient $redis) use ($command, $args): PromiseInterface { $this->awake(); - assert(\is_callable([$redis, $name])); // @phpstan-ignore-next-line - return \call_user_func_array([$redis, $name], $args)->then( + return $redis->callAsync($command, ...$args)->then( function ($result) { $this->idle(); return $result; }, - function (\Exception $error) { + function (\Throwable $e) { + \assert($e instanceof \Exception); $this->idle(); - throw $error; + throw $e; } ); }); diff --git a/tests/Io/StreamingClientTest.php b/tests/Io/StreamingClientTest.php index 241a76d..3c550eb 100644 --- a/tests/Io/StreamingClientTest.php +++ b/tests/Io/StreamingClientTest.php @@ -46,7 +46,7 @@ public function testSending(): void $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); - $this->redis->ping(); + $this->redis->callAsync('ping'); } public function testClosingClientEmitsEvent(): void @@ -121,7 +121,7 @@ public function testPingPong(): void { $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping')); - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $this->redis->handleMessage(new BulkReply('PONG')); @@ -131,7 +131,7 @@ public function testPingPong(): void public function testMonitorCommandIsNotSupported(): void { - $promise = $this->redis->monitor(); + $promise = $this->redis->callAsync('monitor'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( @@ -148,7 +148,7 @@ public function testMonitorCommandIsNotSupported(): void public function testErrorReply(): void { - $promise = $this->redis->invalid(); + $promise = $this->redis->callAsync('invalid'); $err = new ErrorReply("ERR unknown command 'invalid'"); $this->redis->handleMessage($err); @@ -158,7 +158,7 @@ public function testErrorReply(): void public function testClosingClientRejectsAllRemainingRequests(): void { - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $this->redis->close(); $promise->then(null, $this->expectCallableOnceWith( @@ -183,7 +183,7 @@ public function testClosingStreamRejectsAllRemainingRequests(): void assert($this->serializer instanceof SerializerInterface); $this->redis = new StreamingClient($stream, $this->parser, $this->serializer); - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $stream->close(); $promise->then(null, $this->expectCallableOnceWith( @@ -201,9 +201,9 @@ public function testClosingStreamRejectsAllRemainingRequests(): void public function testEndingClientRejectsAllNewRequests(): void { - $this->redis->ping(); + $this->redis->callAsync('ping'); $this->redis->end(); - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( @@ -221,7 +221,7 @@ public function testEndingClientRejectsAllNewRequests(): void public function testClosedClientRejectsAllNewRequests(): void { $this->redis->close(); - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( @@ -250,7 +250,7 @@ public function testEndingBusyClosesClientWhenNotBusyAnymore(): void ++$closed; }); - $promise = $this->redis->ping(); + $promise = $this->redis->callAsync('ping'); $this->assertEquals(0, $closed); $this->redis->end(); @@ -277,7 +277,7 @@ public function testReceivingUnexpectedMessageThrowsException(): void public function testPubsubSubscribe(): StreamingClient { - $promise = $this->redis->subscribe('test'); + $promise = $this->redis->callAsync('subscribe', 'test'); $this->expectPromiseResolve($promise); $this->redis->on('subscribe', $this->expectCallableOnce()); @@ -291,7 +291,7 @@ public function testPubsubSubscribe(): StreamingClient */ public function testPubsubPatternSubscribe(StreamingClient $client): StreamingClient { - $promise = $client->psubscribe('demo_*'); + $promise = $client->callAsync('psubscribe', 'demo_*'); $this->expectPromiseResolve($promise); $client->on('psubscribe', $this->expectCallableOnce()); @@ -311,7 +311,7 @@ public function testPubsubMessage(StreamingClient $client): void public function testSubscribeWithMultipleArgumentsRejects(): void { - $promise = $this->redis->subscribe('a', 'b'); + $promise = $this->redis->callAsync('subscribe', 'a', 'b'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( @@ -328,7 +328,7 @@ public function testSubscribeWithMultipleArgumentsRejects(): void public function testUnsubscribeWithoutArgumentsRejects(): void { - $promise = $this->redis->unsubscribe(); + $promise = $this->redis->callAsync('unsubscribe'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( diff --git a/tests/RedisClientTest.php b/tests/RedisClientTest.php index 273c655..4fd77a6 100644 --- a/tests/RedisClientTest.php +++ b/tests/RedisClientTest.php @@ -152,7 +152,7 @@ public function testPingTwiceWillCreateOnceUnderlyingClient(): void public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleTimer(): void { $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -177,7 +177,7 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPingAndStartIdleT $ref->setValue($this->redis, $this->factory); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -202,7 +202,7 @@ public function testPingWillResolveWhenUnderlyingClientResolvesPingAndNotStartId $ref->setValue($this->redis, $this->factory); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -222,7 +222,7 @@ public function testPingWillRejectWhenUnderlyingClientRejectsPingAndStartIdleTim { $error = new \RuntimeException(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\reject($error)); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\reject($error)); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -277,7 +277,7 @@ public function testPingAfterPreviousUnderlyingClientAlreadyClosedWillCreateNewU { $closeHandler = null; $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->any())->method('on')->withConsecutive( ['close', $this->callback(function ($arg) use (&$closeHandler) { $closeHandler = $arg; @@ -321,7 +321,7 @@ public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves(): v { $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $client->expects($this->exactly(2))->method('callAsync')->willReturnOnConsecutiveCalls( $deferred->promise(), new Promise(function () { }) ); @@ -342,7 +342,7 @@ public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStarts { $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls( + $client->expects($this->exactly(2))->method('callAsync')->willReturnOnConsecutiveCalls( $deferred->promise(), new Promise(function () { }) ); @@ -364,7 +364,7 @@ public function testPingAfterPingWillStartAndCancelIdleTimerWhenSecondPingStarts public function testPingFollowedByIdleTimerWillCloseUnderlyingConnectionWithoutCloseEvent(): void { $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->once())->method('callAsync')->willReturn(\React\Promise\resolve(null)); $client->expects($this->once())->method('close'); $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); @@ -433,7 +433,7 @@ public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientC public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved(): void { $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->once())->method('callAsync')->willReturn(\React\Promise\resolve(null)); $client->expects($this->once())->method('close'); $deferred = new Deferred(); @@ -448,7 +448,7 @@ public function testCloseAfterPingWillCancelIdleTimerWhenPingIsAlreadyResolved() { $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + $client->expects($this->once())->method('callAsync')->willReturn($deferred->promise()); $client->expects($this->once())->method('close'); $this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client)); @@ -469,7 +469,7 @@ public function testCloseAfterPingRejectsWillEmitClose(): void { $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + $client->expects($this->once())->method('callAsync')->willReturn($deferred->promise()); $client->expects($this->once())->method('close')->willReturnCallback(function () use ($client) { assert($client instanceof StreamingClient); $client->emit('close'); @@ -500,7 +500,7 @@ public function testEndWillCloseClientIfUnderlyingConnectionIsNotPending(): void public function testEndAfterPingWillEndUnderlyingClient(): void { $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->once())->method('end'); $deferred = new Deferred(); @@ -515,7 +515,7 @@ public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose(): { $closeHandler = null; $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG')); + $client->expects($this->once())->method('callAsync')->with('ping')->willReturn(\React\Promise\resolve('PONG')); $client->expects($this->once())->method('end'); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$closeHandler) { if ($event === 'close') { @@ -541,7 +541,7 @@ public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError(): void $error = new \RuntimeException(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->once())->method('callAsync')->willReturn(\React\Promise\resolve(null)); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -557,7 +557,7 @@ public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError(): void public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose(): void { $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->once())->method('callAsync')->willReturn(\React\Promise\resolve(null)); $deferred = new Deferred(); $this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise()); @@ -575,7 +575,7 @@ public function testEmitsNoCloseEventButWillCancelIdleTimerWhenUnderlyingConnect $closeHandler = null; $client = $this->createMock(StreamingClient::class); $deferred = new Deferred(); - $client->expects($this->once())->method('__call')->willReturn($deferred->promise()); + $client->expects($this->once())->method('callAsync')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->withConsecutive( ['close', $this->callback(function ($arg) use (&$closeHandler) { $closeHandler = $arg; @@ -605,7 +605,7 @@ public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubCh { $messageHandler = null; $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->once())->method('callAsync')->willReturn(\React\Promise\resolve(null)); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$messageHandler) { if ($event === 'message') { $messageHandler = $callback; @@ -627,7 +627,7 @@ public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClo { $allHandler = null; $client = $this->createMock(StreamingClient::class); - $client->expects($this->exactly(6))->method('__call')->willReturn(\React\Promise\resolve(null)); + $client->expects($this->exactly(6))->method('callAsync')->willReturn(\React\Promise\resolve(null)); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$allHandler) { if (!isset($allHandler[$event])) { $allHandler[$event] = $callback; @@ -670,7 +670,7 @@ public function testSubscribeWillResolveWhenUnderlyingClientResolvesSubscribeAnd $subscribeHandler = null; $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('subscribe')->willReturn($deferred->promise()); + $client->expects($this->once())->method('callAsync')->with('subscribe')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$subscribeHandler) { if ($event === 'subscribe' && $subscribeHandler === null) { $subscribeHandler = $callback; @@ -699,7 +699,7 @@ public function testUnsubscribeAfterSubscribeWillResolveWhenUnderlyingClientReso $deferredSubscribe = new Deferred(); $deferredUnsubscribe = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise()); + $client->expects($this->exactly(2))->method('callAsync')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise()); $client->expects($this->any())->method('on')->willReturnCallback(function ($event, $callback) use (&$subscribeHandler, &$unsubscribeHandler) { if ($event === 'subscribe' && $subscribeHandler === null) { $subscribeHandler = $callback; @@ -734,7 +734,7 @@ public function testBlpopWillRejectWhenUnderlyingClientClosesWhileWaitingForResp $closeHandler = null; $deferred = new Deferred(); $client = $this->createMock(StreamingClient::class); - $client->expects($this->once())->method('__call')->with('blpop')->willReturn($deferred->promise()); + $client->expects($this->once())->method('callAsync')->with('blpop')->willReturn($deferred->promise()); $client->expects($this->any())->method('on')->withConsecutive( ['close', $this->callback(function ($arg) use (&$closeHandler) { $closeHandler = $arg;