Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Fix presence channels emitting too much events #530

Merged
merged 2 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/HttpApi/Controllers/FetchUsersController.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public function __invoke(Request $request)
}

return [
'users' => Collection::make($channel->getUsers())->map(function ($user) {
return ['id' => $user->user_id];
'users' => Collection::make($channel->getUsers())->keys()->map(function ($userId) {
return ['id' => $userId];
})->values(),
];
}
Expand Down
126 changes: 73 additions & 53 deletions src/WebSockets/Channels/PresenceChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,44 @@
use Ratchet\ConnectionInterface;
use stdClass;

/**
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
*/
class PresenceChannel extends Channel
{
/**
* List of users in the channel keyed by their user ID with their info as value.
*
* @var array<string, array>
*/
protected $users = [];

public function getUsers(): array
{
return $this->users;
}

/*
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
/**
* List of sockets keyed by their ID with the value pointing to a user ID.
*
* @var array<string, string>
*/
protected $sockets = [];

public function subscribe(ConnectionInterface $connection, stdClass $payload)
{
$this->verifySignature($connection, $payload);

$this->saveConnection($connection);

$channelData = json_decode($payload->channel_data);
$this->users[$connection->socketId] = $channelData;
$channelData = json_decode($payload->channel_data, true);

// The ID of the user connecting
$userId = (string) $channelData['user_id'];

// Check if the user was already connected to the channel before storing the connection in the state
$userFirstConnection = ! isset($this->users[$userId]);

// Add or replace the user info in the state
$this->users[$userId] = $channelData['user_info'] ?? [];

// Add the socket ID to user ID map in the state
$this->sockets[$connection->socketId] = $userId;

// Send the success event
$connection->send(json_encode([
Expand All @@ -33,72 +51,74 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
'data' => json_encode($this->getChannelData()),
]));

$this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_added',
'channel' => $this->channelName,
'data' => json_encode($channelData),
]);
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the first tab is opened.
if ($userFirstConnection) {
$this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_added',
'channel' => $this->channelName,
'data' => json_encode($channelData),
]);
}
}

public function unsubscribe(ConnectionInterface $connection)
{
parent::unsubscribe($connection);

if (! isset($this->users[$connection->socketId])) {
if (! isset($this->sockets[$connection->socketId])) {
return;
}

$this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_removed',
'channel' => $this->channelName,
'data' => json_encode([
'user_id' => $this->users[$connection->socketId]->user_id,
]),
]);

unset($this->users[$connection->socketId]);
// Find the user ID belonging to this socket
$userId = $this->sockets[$connection->socketId];

// Remove the socket from the state
unset($this->sockets[$connection->socketId]);

// Test if the user still has open sockets to this channel
$userHasOpenConnections = (array_flip($this->sockets)[$userId] ?? null) !== null;

// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed.
if (! $userHasOpenConnections) {
$this->broadcastToOthers($connection, [
'event' => 'pusher_internal:member_removed',
'channel' => $this->channelName,
'data' => json_encode([
'user_id' => $userId,
]),
]);

// Remove the user info from the state
unset($this->users[$userId]);
}
}

protected function getChannelData(): array
{
return [
'presence' => [
'ids' => $userIds = $this->getUserIds(),
'hash' => $this->getHash(),
'count' => count($userIds),
'ids' => array_keys($this->users),
'hash' => $this->users,
'count' => count($this->users),
],
];
}

public function toArray(): array
{
return array_merge(parent::toArray(), [
'user_count' => count($this->getUserIds()),
]);
}

protected function getUserIds(): array
public function getUsers(): array
{
$userIds = array_map(function ($channelData) {
return (string) $channelData->user_id;
}, $this->users);

return array_values(array_unique($userIds));
return $this->users;
}

/**
* Compute the hash for the presence channel integrity.
*
* @return array
*/
protected function getHash(): array
public function toArray(): array
{
$hash = [];

foreach ($this->users as $socketId => $channelData) {
$hash[$channelData->user_id] = $channelData->user_info ?? [];
}

return $hash;
return array_merge(parent::toArray(), [
'user_count' => count($this->users),
]);
}
}
53 changes: 45 additions & 8 deletions tests/Channels/PresenceChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public function clients_with_valid_auth_signatures_can_join_presence_channels()
],
];

$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);

$this->pusherServer->onMessage($connection, $message);

Expand All @@ -63,7 +63,7 @@ public function clients_with_no_user_info_can_join_presence_channels()
'user_id' => 1,
];

$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);

$this->pusherServer->onMessage($connection, $message);

Expand All @@ -80,27 +80,64 @@ public function multiple_clients_with_same_user_id_are_counted_once()

$channelName = 'presence-channel';
$channelData = [
'user_id' => $userId = 1,
'user_id' => $userId = 'user:1',
];

$this->pusherServer->onMessage($connection, $this->getSignedMessage($connection, $channelName, $channelData));
$this->pusherServer->onMessage($connection2, $this->getSignedMessage($connection2, $channelName, $channelData));
$this->pusherServer->onMessage($connection, $this->getSignedSubscribeMessage($connection, $channelName, $channelData));
$this->pusherServer->onMessage($connection2, $this->getSignedSubscribeMessage($connection2, $channelName, $channelData));

$connection2->assertSentEvent('pusher_internal:subscription_succeeded', [
'channel' => $channelName,
'data' => json_encode([
'presence' => [
'ids' => [(string) $userId],
'ids' => [$userId],
'hash' => [
(string) $userId => [],
$userId => [],
],
'count' => 1,
],
]),
]);
}

private function getSignedMessage(Connection $connection, string $channelName, array $channelData): Message
/** @test */
public function multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
{
$channelName = 'presence-channel';

// Connect the `observer` user to the server
$this->pusherServer->onOpen($observerConnection = $this->getWebSocketConnection());
$this->pusherServer->onMessage($observerConnection, $this->getSignedSubscribeMessage($observerConnection, $channelName, ['user_id' => 'observer']));

// Connect the first socket for user `user:1` to the server
$this->pusherServer->onOpen($firstConnection = $this->getWebSocketConnection());
$this->pusherServer->onMessage($firstConnection, $this->getSignedSubscribeMessage($firstConnection, $channelName, ['user_id' => 'user:1']));

// Make sure the observer sees a `member_added` event for `user:1`
$observerConnection->assertSentEvent('pusher_internal:member_added');
$observerConnection->resetEvents();

// Connect the second socket for user `user:1` to the server
$this->pusherServer->onOpen($secondConnection = $this->getWebSocketConnection());
$this->pusherServer->onMessage($secondConnection, $this->getSignedSubscribeMessage($secondConnection, $channelName, ['user_id' => 'user:1']));

// Make sure the observer was not notified of a `member_added` event (user was already connected)
$observerConnection->assertNotSentEvent('pusher_internal:member_added');

// Disconnect the first socket for user `user:1` on the server
$this->pusherServer->onClose($firstConnection);

// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');

// Disconnect the second (and last) socket for user `user:1` on the server
$this->pusherServer->onClose($secondConnection);

// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
$observerConnection->assertSentEvent('pusher_internal:member_removed');
}

private function getSignedSubscribeMessage(Connection $connection, string $channelName, array $channelData): Message
{
$signature = "{$connection->socketId}:{$channelName}:".json_encode($channelData);

Expand Down
33 changes: 33 additions & 0 deletions tests/HttpApi/FetchChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,39 @@ public function it_returns_the_channel_information()
], json_decode($response->getContent(), true));
}

/** @test */
public function it_returns_the_channel_information_for_presence_channel()
{
$this->joinPresenceChannel('presence-global', 'user:1');
$this->joinPresenceChannel('presence-global', 'user:2');
$this->joinPresenceChannel('presence-global', 'user:2');

$connection = new Connection();

$requestPath = '/apps/1234/channel/presence-global';
$routeParams = [
'appId' => '1234',
'channelName' => 'presence-global',
];

$queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath);

$request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams));

$controller = app(FetchChannelController::class);

$controller->onOpen($connection, $request);

/** @var JsonResponse $response */
$response = array_pop($connection->sentRawData);

$this->assertSame([
'occupied' => true,
'subscription_count' => 3,
'user_count' => 2,
], json_decode($response->getContent(), true));
}

/** @test */
public function it_returns_404_for_invalid_channels()
{
Expand Down
8 changes: 4 additions & 4 deletions tests/HttpApi/FetchChannelsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ public function it_returns_the_channel_information_for_prefix()
/** @test */
public function it_returns_the_channel_information_for_prefix_with_user_count()
{
$this->joinPresenceChannel('presence-global.1');
$this->joinPresenceChannel('presence-global.1');
$this->joinPresenceChannel('presence-global.2');
$this->joinPresenceChannel('presence-notglobal.2');
$this->joinPresenceChannel('presence-global.1', 'user:1');
$this->joinPresenceChannel('presence-global.1', 'user:2');
$this->joinPresenceChannel('presence-global.2', 'user:3');
$this->joinPresenceChannel('presence-notglobal.2', 'user:4');

$connection = new Connection();

Expand Down
6 changes: 6 additions & 0 deletions tests/Mocks/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public function close()
$this->closed = true;
}

public function resetEvents()
{
$this->sentData = [];
$this->sentRawData = [];
}

public function assertSentEvent(string $name, array $additionalParameters = [])
{
$event = collect($this->sentData)->firstWhere('event', '=', $name);
Expand Down
4 changes: 2 additions & 2 deletions tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ protected function getConnectedWebSocketConnection(array $channelsToJoin = [], s
return $connection;
}

protected function joinPresenceChannel($channel): Connection
protected function joinPresenceChannel($channel, $userId = null): Connection
{
$connection = $this->getWebSocketConnection();

$this->pusherServer->onOpen($connection);

$channelData = [
'user_id' => 1,
'user_id' => $userId ?? 1,
'user_info' => [
'name' => 'Marcel',
],
Expand Down