diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index ffe02f86c8..c29f7ff9f9 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager */ protected $users = []; + /** + * The list of users by socket and their attached id. + * + * @var array + */ + protected $userSockets = []; + /** * Wether the current instance accepts new connections. * @@ -273,6 +280,7 @@ public function broadcastAcrossServers($appId, string $channel, stdClass $payloa public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload) { $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user); + $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId; } /** @@ -287,6 +295,19 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel) { unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]); + + $deletableSocketKey = array_search( + $connection->socketId, + $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"] + ); + + if ($deletableSocketKey !== false) { + unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]); + + if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) { + unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]); + } + } } /** @@ -342,6 +363,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt return new FulfilledPromise($results); } + /** + * Get the socket IDs for a presence channel member. + * + * @param string|int $userId + * @param string|int $appId + * @param string $channelName + * @return \React\Promise\PromiseInterface + */ + public function getMemberSockets($userId, $appId, $channelName): PromiseInterface + { + return new FulfilledPromise( + $this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? [] + ); + } + /** * Keep tracking the connections availability when they pong. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 3071a8a13f..ac8847ff43 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -313,6 +313,10 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl $this->storeUserData( $connection->app->id, $channel, $connection->socketId, json_encode($user) ); + + $this->addUserSocket( + $connection->app->id, $channel, $user, $connection->socketId + ); } /** @@ -329,6 +333,10 @@ public function userLeftPresenceChannel(ConnectionInterface $connection, stdClas $this->removeUserData( $connection->app->id, $channel, $connection->socketId ); + + $this->removeUserSocket( + $connection->app->id, $channel, $user, $connection->socketId + ); } /** @@ -389,6 +397,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt }); } + /** + * Get the socket IDs for a presence channel member. + * + * @param string|int $userId + * @param string|int $appId + * @param string $channelName + * @return \React\Promise\PromiseInterface + */ + public function getMemberSockets($userId, $appId, $channelName): PromiseInterface + { + return $this->publishClient->smembers( + $this->getRedisKey($appId, $channelName, [$userId, 'userSockets']) + ); + } + /** * Keep tracking the connections availability when they pong. * @@ -628,7 +651,7 @@ public function removeChannelFromSet($appId, string $channel) * @param string|int $appId * @param string|null $channel * @param string $key - * @param mixed $data + * @param string $data * @return PromiseInterface */ public function storeUserData($appId, string $channel = null, string $key, $data) @@ -681,6 +704,40 @@ public function unsubscribeFromTopic($appId, string $channel = null) ); } + /** + * Add the Presence Channel's User's Socket ID to a list. + * + * @param string|int $appId + * @param string $channel + * @param stdClass $user + * @param string $socketId + * @return void + */ + protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId) + { + $this->publishClient->sadd( + $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), + $socketId + ); + } + + /** + * Remove the Presence Channel's User's Socket ID from the list. + * + * @param string|int $appId + * @param string $channel + * @param stdClass $user + * @param string $socketId + * @return void + */ + protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId) + { + $this->publishClient->srem( + $this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']), + $socketId + ); + } + /** * Get the Redis Keyspace name to handle subscriptions * and other key-value sets. diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 2bfa8ef926..eb81f35299 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -55,21 +55,32 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) ])); }); - $memberAddedPayload = [ - 'event' => 'pusher_internal:member_added', - 'channel' => $this->getName(), - 'data' => $payload->channel_data, - ]; - - $this->broadcastToEveryoneExcept( - (object) $memberAddedPayload, $connection->socketId, - $connection->app->id - ); + // The `pusher_internal:member_added` event is triggered when a user joins a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the first tab is opened. + $this->channelManager + ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) + ->then(function ($sockets) use ($payload, $connection) { + if (count($sockets) === 1) { + $memberAddedPayload = [ + 'event' => 'pusher_internal:member_added', + 'channel' => $this->getName(), + 'data' => $payload->channel_data, + ]; + + $this->broadcastToEveryoneExcept( + (object) $memberAddedPayload, $connection->socketId, + $connection->app->id + ); + } - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ - 'socketId' => $connection->socketId, - 'channel' => $this->getName(), - ]); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ + 'socketId' => $connection->socketId, + 'channel' => $this->getName(), + 'duplicate-connection' => count($sockets) > 1, + ]); + }); } /** @@ -95,18 +106,28 @@ public function unsubscribe(ConnectionInterface $connection) $connection, $user, $this->getName() ); - $memberRemovedPayload = [ - 'event' => 'pusher_internal:member_removed', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'user_id' => $user->user_id, - ]), - ]; - - $this->broadcastToEveryoneExcept( - (object) $memberRemovedPayload, $connection->socketId, - $connection->app->id - ); + // The `pusher_internal:member_removed` is triggered when a user leaves a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the last one is closed. + $this->channelManager + ->getMemberSockets($user->user_id, $connection->app->id, $this->getName()) + ->then(function ($sockets) use ($connection, $user) { + if (count($sockets) === 0) { + $memberRemovedPayload = [ + 'event' => 'pusher_internal:member_removed', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'user_id' => $user->user_id, + ]), + ]; + + $this->broadcastToEveryoneExcept( + (object) $memberRemovedPayload, $connection->socketId, + $connection->app->id + ); + } + }); }); } } diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index 35d5baf6c6..5f1f358591 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -186,6 +186,16 @@ public function getChannelMember(ConnectionInterface $connection, string $channe */ public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface; + /** + * Get the socket IDs for a presence channel member. + * + * @param string|int $userId + * @param string|int $appId + * @param string $channelName + * @return \React\Promise\PromiseInterface + */ + public function getMemberSockets($userId, $appId, $channelName): PromiseInterface; + /** * Keep tracking the connections availability when they pong. * diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 42d02c0732..e4d6e1fca1 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -58,6 +58,32 @@ public function close() $this->closed = true; } + /** + * Reset the events for assertions. + * + * @return $this + */ + public function resetEvents() + { + $this->sentData = []; + $this->sentRawData = []; + + return $this; + } + + /** + * Dump & stop execution. + * + * @return void + */ + public function dd() + { + dd([ + 'sentData' => $this->sentData, + 'sentRawData' => $this->sentRawData, + ]); + } + /** * Assert that an event got sent. * diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index 2bd54e2f34..755f895447 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -61,6 +61,31 @@ public function test_connect_to_presence_channel_with_valid_signature() }); } + public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined() + { + $rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]); + $pickleRick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + + foreach ([$rick, $morty, $pickleRick] as $connection) { + $connection->assertSentEvent('pusher_internal:subscription_succeeded', [ + 'channel' => 'presence-channel', + ]); + } + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'presence-channel') + ->then(function ($total) { + $this->assertEquals(3, $total); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(2, $members); + }); + } + public function test_presence_channel_broadcast_member_events() { $rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]); @@ -200,4 +225,56 @@ public function test_local_connections_for_presence_channels() } }); } + + public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection() + { + // Connect the `observer` user to the server + $observerConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 'observer']); + + // Connect the first socket for user `1` to the server + $firstConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']); + + // Make sure the observer sees a `member_added` event for `user:1` + $observerConnection->assertSentEvent('pusher_internal:member_added', [ + 'event' => 'pusher_internal:member_added', + 'channel' => 'presence-channel', + 'data' => json_encode(['user_id' => '1']), + ])->resetEvents(); + + // Connect the second socket for user `1` to the server + $secondConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']); + + // Make sure the observer was not notified of a `member_added` event (user was already connected) + $observerConnection->assertNotSentEvent('pusher_internal:member_added'); + + // Disconnect the first socket for user `1` on the server + $this->pusherServer->onClose($firstConnection); + + // Make sure the observer was not notified of a `member_removed` event (user still connected on another socket) + $observerConnection->assertNotSentEvent('pusher_internal:member_removed'); + + // Disconnect the second (and last) socket for user `1` on the server + $this->pusherServer->onClose($secondConnection); + + // Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected) + $observerConnection->assertSentEvent('pusher_internal:member_removed'); + + $this->channelManager + ->getMemberSockets('1', '1234', 'presence-channel') + ->then(function ($sockets) { + $this->assertCount(0, $sockets); + }); + + $this->channelManager + ->getMemberSockets('2', '1234', 'presence-channel') + ->then(function ($sockets) { + $this->assertCount(0, $sockets); + }); + + $this->channelManager + ->getMemberSockets('observer', '1234', 'presence-channel') + ->then(function ($sockets) { + $this->assertCount(1, $sockets); + }); + } }