diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index d59da7c271..81f3dd0a2b 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -22,8 +22,8 @@ public function __invoke(Request $request) } return [ - 'users' => Collection::make($channel->getUsers())->map(function ($user) { - return ['id' => $user->user_id]; + 'users' => Collection::make($channel->getUsers())->keys()->map(function ($userId) { + return ['id' => $userId]; })->values(), ]; } diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index a4de1570ce..ac13bcfc66 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -5,26 +5,44 @@ use Ratchet\ConnectionInterface; use stdClass; +/** + * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + */ class PresenceChannel extends Channel { + /** + * List of users in the channel keyed by their user ID with their info as value. + * + * @var array + */ protected $users = []; - public function getUsers(): array - { - return $this->users; - } - - /* - * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + /** + * List of sockets keyed by their ID with the value pointing to a user ID. + * + * @var array */ + protected $sockets = []; + public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->verifySignature($connection, $payload); $this->saveConnection($connection); - $channelData = json_decode($payload->channel_data); - $this->users[$connection->socketId] = $channelData; + $channelData = json_decode($payload->channel_data, true); + + // The ID of the user connecting + $userId = (string) $channelData['user_id']; + + // Check if the user was already connected to the channel before storing the connection in the state + $userFirstConnection = ! isset($this->users[$userId]); + + // Add or replace the user info in the state + $this->users[$userId] = $channelData['user_info'] ?? []; + + // Add the socket ID to user ID map in the state + $this->sockets[$connection->socketId] = $userId; // Send the success event $connection->send(json_encode([ @@ -33,72 +51,74 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) 'data' => json_encode($this->getChannelData()), ])); - $this->broadcastToOthers($connection, [ - 'event' => 'pusher_internal:member_added', - 'channel' => $this->channelName, - 'data' => json_encode($channelData), - ]); + // The `pusher_internal:member_added` event is triggered when a user joins a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the first tab is opened. + if ($userFirstConnection) { + $this->broadcastToOthers($connection, [ + 'event' => 'pusher_internal:member_added', + 'channel' => $this->channelName, + 'data' => json_encode($channelData), + ]); + } } public function unsubscribe(ConnectionInterface $connection) { parent::unsubscribe($connection); - if (! isset($this->users[$connection->socketId])) { + if (! isset($this->sockets[$connection->socketId])) { return; } - $this->broadcastToOthers($connection, [ - 'event' => 'pusher_internal:member_removed', - 'channel' => $this->channelName, - 'data' => json_encode([ - 'user_id' => $this->users[$connection->socketId]->user_id, - ]), - ]); - - unset($this->users[$connection->socketId]); + // Find the user ID belonging to this socket + $userId = $this->sockets[$connection->socketId]; + + // Remove the socket from the state + unset($this->sockets[$connection->socketId]); + + // Test if the user still has open sockets to this channel + $userHasOpenConnections = (array_flip($this->sockets)[$userId] ?? null) !== null; + + // The `pusher_internal:member_removed` is triggered when a user leaves a channel. + // It's quite possible that a user can have multiple connections to the same channel + // (for example by having multiple browser tabs open) + // and in this case the events will only be triggered when the last one is closed. + if (! $userHasOpenConnections) { + $this->broadcastToOthers($connection, [ + 'event' => 'pusher_internal:member_removed', + 'channel' => $this->channelName, + 'data' => json_encode([ + 'user_id' => $userId, + ]), + ]); + + // Remove the user info from the state + unset($this->users[$userId]); + } } protected function getChannelData(): array { return [ 'presence' => [ - 'ids' => $userIds = $this->getUserIds(), - 'hash' => $this->getHash(), - 'count' => count($userIds), + 'ids' => array_keys($this->users), + 'hash' => $this->users, + 'count' => count($this->users), ], ]; } - public function toArray(): array - { - return array_merge(parent::toArray(), [ - 'user_count' => count($this->getUserIds()), - ]); - } - - protected function getUserIds(): array + public function getUsers(): array { - $userIds = array_map(function ($channelData) { - return (string) $channelData->user_id; - }, $this->users); - - return array_values(array_unique($userIds)); + return $this->users; } - /** - * Compute the hash for the presence channel integrity. - * - * @return array - */ - protected function getHash(): array + public function toArray(): array { - $hash = []; - - foreach ($this->users as $socketId => $channelData) { - $hash[$channelData->user_id] = $channelData->user_info ?? []; - } - - return $hash; + return array_merge(parent::toArray(), [ + 'user_count' => count($this->users), + ]); } } diff --git a/tests/Channels/PresenceChannelTest.php b/tests/Channels/PresenceChannelTest.php index 5df163f980..ac5bc45b7f 100644 --- a/tests/Channels/PresenceChannelTest.php +++ b/tests/Channels/PresenceChannelTest.php @@ -43,7 +43,7 @@ public function clients_with_valid_auth_signatures_can_join_presence_channels() ], ]; - $message = $this->getSignedMessage($connection, 'presence-channel', $channelData); + $message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData); $this->pusherServer->onMessage($connection, $message); @@ -63,7 +63,7 @@ public function clients_with_no_user_info_can_join_presence_channels() 'user_id' => 1, ]; - $message = $this->getSignedMessage($connection, 'presence-channel', $channelData); + $message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData); $this->pusherServer->onMessage($connection, $message); @@ -80,19 +80,19 @@ public function multiple_clients_with_same_user_id_are_counted_once() $channelName = 'presence-channel'; $channelData = [ - 'user_id' => $userId = 1, + 'user_id' => $userId = 'user:1', ]; - $this->pusherServer->onMessage($connection, $this->getSignedMessage($connection, $channelName, $channelData)); - $this->pusherServer->onMessage($connection2, $this->getSignedMessage($connection2, $channelName, $channelData)); + $this->pusherServer->onMessage($connection, $this->getSignedSubscribeMessage($connection, $channelName, $channelData)); + $this->pusherServer->onMessage($connection2, $this->getSignedSubscribeMessage($connection2, $channelName, $channelData)); $connection2->assertSentEvent('pusher_internal:subscription_succeeded', [ 'channel' => $channelName, 'data' => json_encode([ 'presence' => [ - 'ids' => [(string) $userId], + 'ids' => [$userId], 'hash' => [ - (string) $userId => [], + $userId => [], ], 'count' => 1, ], @@ -100,7 +100,44 @@ public function multiple_clients_with_same_user_id_are_counted_once() ]); } - private function getSignedMessage(Connection $connection, string $channelName, array $channelData): Message + /** @test */ + public function multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection() + { + $channelName = 'presence-channel'; + + // Connect the `observer` user to the server + $this->pusherServer->onOpen($observerConnection = $this->getWebSocketConnection()); + $this->pusherServer->onMessage($observerConnection, $this->getSignedSubscribeMessage($observerConnection, $channelName, ['user_id' => 'observer'])); + + // Connect the first socket for user `user:1` to the server + $this->pusherServer->onOpen($firstConnection = $this->getWebSocketConnection()); + $this->pusherServer->onMessage($firstConnection, $this->getSignedSubscribeMessage($firstConnection, $channelName, ['user_id' => 'user:1'])); + + // Make sure the observer sees a `member_added` event for `user:1` + $observerConnection->assertSentEvent('pusher_internal:member_added'); + $observerConnection->resetEvents(); + + // Connect the second socket for user `user:1` to the server + $this->pusherServer->onOpen($secondConnection = $this->getWebSocketConnection()); + $this->pusherServer->onMessage($secondConnection, $this->getSignedSubscribeMessage($secondConnection, $channelName, ['user_id' => 'user:1'])); + + // Make sure the observer was not notified of a `member_added` event (user was already connected) + $observerConnection->assertNotSentEvent('pusher_internal:member_added'); + + // Disconnect the first socket for user `user:1` on the server + $this->pusherServer->onClose($firstConnection); + + // Make sure the observer was not notified of a `member_removed` event (user still connected on another socket) + $observerConnection->assertNotSentEvent('pusher_internal:member_removed'); + + // Disconnect the second (and last) socket for user `user:1` on the server + $this->pusherServer->onClose($secondConnection); + + // Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected) + $observerConnection->assertSentEvent('pusher_internal:member_removed'); + } + + private function getSignedSubscribeMessage(Connection $connection, string $channelName, array $channelData): Message { $signature = "{$connection->socketId}:{$channelName}:".json_encode($channelData); diff --git a/tests/HttpApi/FetchChannelTest.php b/tests/HttpApi/FetchChannelTest.php index 262f93cffe..8324d9e24f 100644 --- a/tests/HttpApi/FetchChannelTest.php +++ b/tests/HttpApi/FetchChannelTest.php @@ -66,6 +66,39 @@ public function it_returns_the_channel_information() ], json_decode($response->getContent(), true)); } + /** @test */ + public function it_returns_the_channel_information_for_presence_channel() + { + $this->joinPresenceChannel('presence-global', 'user:1'); + $this->joinPresenceChannel('presence-global', 'user:2'); + $this->joinPresenceChannel('presence-global', 'user:2'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/presence-global'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'presence-global', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 3, + 'user_count' => 2, + ], json_decode($response->getContent(), true)); + } + /** @test */ public function it_returns_404_for_invalid_channels() { diff --git a/tests/HttpApi/FetchChannelsTest.php b/tests/HttpApi/FetchChannelsTest.php index 8dcc1fe2ee..0cf5a55e40 100644 --- a/tests/HttpApi/FetchChannelsTest.php +++ b/tests/HttpApi/FetchChannelsTest.php @@ -103,10 +103,10 @@ public function it_returns_the_channel_information_for_prefix() /** @test */ public function it_returns_the_channel_information_for_prefix_with_user_count() { - $this->joinPresenceChannel('presence-global.1'); - $this->joinPresenceChannel('presence-global.1'); - $this->joinPresenceChannel('presence-global.2'); - $this->joinPresenceChannel('presence-notglobal.2'); + $this->joinPresenceChannel('presence-global.1', 'user:1'); + $this->joinPresenceChannel('presence-global.1', 'user:2'); + $this->joinPresenceChannel('presence-global.2', 'user:3'); + $this->joinPresenceChannel('presence-notglobal.2', 'user:4'); $connection = new Connection(); diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 2e9c60669d..b7c812d8c6 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -28,6 +28,12 @@ public function close() $this->closed = true; } + public function resetEvents() + { + $this->sentData = []; + $this->sentRawData = []; + } + public function assertSentEvent(string $name, array $additionalParameters = []) { $event = collect($this->sentData)->firstWhere('event', '=', $name); diff --git a/tests/TestCase.php b/tests/TestCase.php index c52e83b20e..ea5168febd 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -89,14 +89,14 @@ protected function getConnectedWebSocketConnection(array $channelsToJoin = [], s return $connection; } - protected function joinPresenceChannel($channel): Connection + protected function joinPresenceChannel($channel, $userId = null): Connection { $connection = $this->getWebSocketConnection(); $this->pusherServer->onOpen($connection); $channelData = [ - 'user_id' => 1, + 'user_id' => $userId ?? 1, 'user_info' => [ 'name' => 'Marcel', ],