diff --git a/.gitignore b/.gitignore index 461ba139fd..f423e5bf06 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ build composer.lock vendor coverage -.phpunit.result.cache \ No newline at end of file +.phpunit.result.cache +.idea/ diff --git a/composer.json b/composer.json index 2dfd9ae0ba..b96518f99c 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "ext-json": "*", "cboden/ratchet": "^0.4.1", "clue/buzz-react": "^2.5", + "clue/redis-react": "^2.3", "facade/ignition-contracts": "^1.0", "guzzlehttp/psr7": "^1.5", "illuminate/broadcasting": "5.8.*|^6.0|^7.0", diff --git a/config/websockets.php b/config/websockets.php index 2f3f3212f6..a2ca845400 100644 --- a/config/websockets.php +++ b/config/websockets.php @@ -141,6 +141,34 @@ ], + /* + |-------------------------------------------------------------------------- + | Broadcasting Replication + |-------------------------------------------------------------------------- + | + | You can enable replication to publish and subscribe to + | messages across the driver. + | + | By default, it is disabled, but you can configure it to use drivers + | like Redis to ensure connection between multiple instances of + | WebSocket servers. + | + */ + + 'replication' => [ + + 'enabled' => false, + + 'driver' => 'redis', + + 'redis' => [ + + 'connection' => 'default', + + ], + + ], + 'statistics' => [ /* diff --git a/src/Apps/ConfigAppProvider.php b/src/Apps/ConfigAppProvider.php index 69d8bfeb2c..211bb83326 100644 --- a/src/Apps/ConfigAppProvider.php +++ b/src/Apps/ConfigAppProvider.php @@ -19,7 +19,7 @@ public function all(): array { return $this->apps ->map(function (array $appAttributes) { - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); }) ->toArray(); } @@ -30,7 +30,7 @@ public function findById($appId): ?App ->apps ->firstWhere('id', $appId); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } public function findByKey(string $appKey): ?App @@ -39,7 +39,7 @@ public function findByKey(string $appKey): ?App ->apps ->firstWhere('key', $appKey); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } public function findBySecret(string $appSecret): ?App @@ -48,10 +48,10 @@ public function findBySecret(string $appSecret): ?App ->apps ->firstWhere('secret', $appSecret); - return $this->instanciate($appAttributes); + return $this->instantiate($appAttributes); } - protected function instanciate(?array $appAttributes): ?App + protected function instantiate(?array $appAttributes): ?App { if (! $appAttributes) { return null; diff --git a/src/Console/StartWebSocketServer.php b/src/Console/StartWebSocketServer.php index 91a5d8c8b5..2223e8a991 100644 --- a/src/Console/StartWebSocketServer.php +++ b/src/Console/StartWebSocketServer.php @@ -4,6 +4,7 @@ use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger; use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger; use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger; use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger; @@ -49,6 +50,7 @@ public function handle() ->configureRestartTimer() ->registerEchoRoutes() ->registerCustomRoutes() + ->configurePubSubReplication() ->startWebSocketServer(); } @@ -61,7 +63,7 @@ protected function configureStatisticsLogger() $browser = new Browser($this->loop, $connector); - app()->singleton(StatisticsLoggerInterface::class, function () use ($browser) { + $this->laravel->singleton(StatisticsLoggerInterface::class, function () use ($browser) { $class = config('websockets.statistics.logger', \BeyondCode\LaravelWebSockets\Statistics\Logger::class); return new $class(app(ChannelManager::class), $browser); @@ -76,7 +78,7 @@ protected function configureStatisticsLogger() protected function configureHttpLogger() { - app()->singleton(HttpLogger::class, function () { + $this->laravel->singleton(HttpLogger::class, function () { return (new HttpLogger($this->output)) ->enable($this->option('debug') ?: config('app.debug')) ->verbose($this->output->isVerbose()); @@ -87,7 +89,7 @@ protected function configureHttpLogger() protected function configureMessageLogger() { - app()->singleton(WebsocketsLogger::class, function () { + $this->laravel->singleton(WebsocketsLogger::class, function () { return (new WebsocketsLogger($this->output)) ->enable($this->option('debug') ?: config('app.debug')) ->verbose($this->output->isVerbose()); @@ -98,7 +100,7 @@ protected function configureMessageLogger() protected function configureConnectionLogger() { - app()->bind(ConnectionLogger::class, function () { + $this->laravel->bind(ConnectionLogger::class, function () { return (new ConnectionLogger($this->output)) ->enable(config('app.debug')) ->verbose($this->output->isVerbose()); @@ -151,6 +153,13 @@ protected function startWebSocketServer() ->run(); } + protected function configurePubSubReplication() + { + $this->laravel->get(ReplicationInterface::class)->boot($this->loop); + + return $this; + } + protected function getDnsResolver(): ResolverInterface { if (! config('websockets.statistics.perform_dns_lookup')) { diff --git a/src/Facades/StatisticsLogger.php b/src/Facades/StatisticsLogger.php index ba8f70bc27..9aadfa7425 100644 --- a/src/Facades/StatisticsLogger.php +++ b/src/Facades/StatisticsLogger.php @@ -5,7 +5,10 @@ use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface; use Illuminate\Support\Facades\Facade; -/** @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger */ +/** + * @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger + * @mixin \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger + */ class StatisticsLogger extends Facade { protected static function getFacadeAccessor() diff --git a/src/Facades/WebSocketsRouter.php b/src/Facades/WebSocketsRouter.php index 2c7b75a423..925f6856e7 100644 --- a/src/Facades/WebSocketsRouter.php +++ b/src/Facades/WebSocketsRouter.php @@ -4,7 +4,10 @@ use Illuminate\Support\Facades\Facade; -/** @see \BeyondCode\LaravelWebSockets\Server\Router */ +/** + * @see \BeyondCode\LaravelWebSockets\Server\Router + * @mixin \BeyondCode\LaravelWebSockets\Server\Router + */ class WebSocketsRouter extends Facade { protected static function getFacadeAccessor() diff --git a/src/HttpApi/Controllers/Controller.php b/src/HttpApi/Controllers/Controller.php index 492b3eb800..6e8b449f50 100644 --- a/src/HttpApi/Controllers/Controller.php +++ b/src/HttpApi/Controllers/Controller.php @@ -16,6 +16,7 @@ use Pusher\Pusher; use Ratchet\ConnectionInterface; use Ratchet\Http\HttpServerInterface; +use React\Promise\PromiseInterface; use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory; use Symfony\Component\HttpKernel\Exception\HttpException; @@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface /** @var int */ protected $contentLength; - /** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */ + /** @var ChannelManager */ protected $channelManager; public function __construct(ChannelManager $channelManager) @@ -46,7 +47,11 @@ public function onOpen(ConnectionInterface $connection, RequestInterface $reques $this->requestBuffer = (string) $request->getBody(); - $this->checkContentLength($connection); + if (! $this->verifyContentLength()) { + return; + } + + $this->handleRequest($connection); } protected function findContentLength(array $headers): int @@ -60,31 +65,53 @@ public function onMessage(ConnectionInterface $from, $msg) { $this->requestBuffer .= $msg; - $this->checkContentLength($from); + if (! $this->verifyContentLength()) { + return; + } + + $this->handleRequest($from); + } + + protected function verifyContentLength() + { + return strlen($this->requestBuffer) === $this->contentLength; } - protected function checkContentLength(ConnectionInterface $connection) + protected function handleRequest(ConnectionInterface $connection) { - if (strlen($this->requestBuffer) === $this->contentLength) { - $serverRequest = (new ServerRequest( - $this->request->getMethod(), - $this->request->getUri(), - $this->request->getHeaders(), - $this->requestBuffer, - $this->request->getProtocolVersion() - ))->withQueryParams(QueryParameters::create($this->request)->all()); + $serverRequest = (new ServerRequest( + $this->request->getMethod(), + $this->request->getUri(), + $this->request->getHeaders(), + $this->requestBuffer, + $this->request->getProtocolVersion() + ))->withQueryParams(QueryParameters::create($this->request)->all()); + + $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); - $laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest)); + $this + ->ensureValidAppId($laravelRequest->appId) + ->ensureValidSignature($laravelRequest); - $this - ->ensureValidAppId($laravelRequest->appId) - ->ensureValidSignature($laravelRequest); + // Invoke the controller action + $response = $this($laravelRequest); - $response = $this($laravelRequest); + // Allow for async IO in the controller action + if ($response instanceof PromiseInterface) { + $response->then(function ($response) use ($connection) { + $this->sendAndClose($connection, $response); + }); - $connection->send(JsonResponse::create($response)); - $connection->close(); + return; } + + $this->sendAndClose($connection, $response); + } + + protected function sendAndClose(ConnectionInterface $connection, $response) + { + $connection->send(JsonResponse::create($response)); + $connection->close(); } public function onClose(ConnectionInterface $connection) @@ -122,7 +149,7 @@ protected function ensureValidSignature(Request $request) /* * The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value. * - * The `appId`, `appKey` & `channelName` parameters are actually route paramaters and are never supplied by the client. + * The `appId`, `appKey` & `channelName` parameters are actually route parameters and are never supplied by the client. */ $params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']); diff --git a/src/HttpApi/Controllers/FetchChannelController.php b/src/HttpApi/Controllers/FetchChannelController.php index 6a24fd5e2c..188e08cc4e 100644 --- a/src/HttpApi/Controllers/FetchChannelController.php +++ b/src/HttpApi/Controllers/FetchChannelController.php @@ -15,6 +15,6 @@ public function __invoke(Request $request) throw new HttpException(404, "Unknown channel `{$request->channelName}`."); } - return $channel->toArray(); + return $channel->toArray($request->appId); } } diff --git a/src/HttpApi/Controllers/FetchChannelsController.php b/src/HttpApi/Controllers/FetchChannelsController.php index 89dab3373b..fdf02b2836 100644 --- a/src/HttpApi/Controllers/FetchChannelsController.php +++ b/src/HttpApi/Controllers/FetchChannelsController.php @@ -6,9 +6,22 @@ use Illuminate\Support\Collection; use Illuminate\Support\Str; use Symfony\Component\HttpKernel\Exception\HttpException; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; +use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; +use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel; class FetchChannelsController extends Controller { + /** @var ReplicationInterface */ + protected $replication; + + public function __construct(ChannelManager $channelManager, ReplicationInterface $replication) + { + parent::__construct($channelManager); + + $this->replication = $replication; + } + public function __invoke(Request $request) { $attributes = []; @@ -29,15 +42,28 @@ public function __invoke(Request $request) }); } - return [ - 'channels' => $channels->map(function ($channel) use ($attributes) { - $info = new \stdClass; - if (in_array('user_count', $attributes)) { - $info->user_count = count($channel->getUsers()); - } + // We want to get the channel user count all in one shot when + // using a replication backend rather than doing individual queries. + // To do so, we first collect the list of channel names. + $channelNames = $channels->map(function (PresenceChannel $channel) use ($request) { + return $channel->getChannelName(); + })->toArray(); + + // We ask the replication backend to get us the member count per channel. + // We get $counts back as a key-value array of channel names and their member count. + return $this->replication + ->channelMemberCounts($request->appId, $channelNames) + ->then(function (array $counts) use ($channels, $attributes) { + return [ + 'channels' => $channels->map(function (PresenceChannel $channel) use ($counts, $attributes) { + $info = new \stdClass; + if (in_array('user_count', $attributes)) { + $info->user_count = $counts[$channel->getChannelName()]; + } - return $info; - })->toArray() ?: new \stdClass, - ]; + return $info; + })->toArray() ?: new \stdClass, + ]; + }); } } diff --git a/src/HttpApi/Controllers/FetchUsersController.php b/src/HttpApi/Controllers/FetchUsersController.php index d59da7c271..efb712f5cd 100644 --- a/src/HttpApi/Controllers/FetchUsersController.php +++ b/src/HttpApi/Controllers/FetchUsersController.php @@ -21,10 +21,14 @@ public function __invoke(Request $request) throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"'); } - return [ - 'users' => Collection::make($channel->getUsers())->map(function ($user) { - return ['id' => $user->user_id]; - })->values(), - ]; + return $channel + ->getUsers($request->appId) + ->then(function (array $users) { + return [ + 'users' => Collection::make($users)->map(function ($user) { + return ['id' => $user->user_id]; + })->values(), + ]; + }); } } diff --git a/src/HttpApi/Controllers/TriggerEventController.php b/src/HttpApi/Controllers/TriggerEventController.php index 7f0000569b..bc921e4f1f 100644 --- a/src/HttpApi/Controllers/TriggerEventController.php +++ b/src/HttpApi/Controllers/TriggerEventController.php @@ -19,7 +19,7 @@ public function __invoke(Request $request) 'channel' => $channelName, 'event' => $request->json()->get('name'), 'data' => $request->json()->get('data'), - ], $request->json()->get('socket_id')); + ], $request->json()->get('socket_id'), $request->appId); DashboardLogger::apiMessage( $request->appId, diff --git a/src/PubSub/Broadcasters/RedisPusherBroadcaster.php b/src/PubSub/Broadcasters/RedisPusherBroadcaster.php new file mode 100644 index 0000000000..f1be3a5ece --- /dev/null +++ b/src/PubSub/Broadcasters/RedisPusherBroadcaster.php @@ -0,0 +1,150 @@ +pusher = $pusher; + $this->appId = $appId; + $this->redis = $redis; + $this->connection = $connection; + } + + /** + * Authenticate the incoming request for a given channel. + * + * @param \Illuminate\Http\Request $request + * @return mixed + * + * @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException + */ + public function auth($request) + { + $channelName = $this->normalizeChannelName($request->channel_name); + + if ($this->isGuardedChannel($request->channel_name) && + ! $this->retrieveUser($request, $channelName)) { + throw new AccessDeniedHttpException; + } + + return parent::verifyUserCanAccessChannel( + $request, $channelName + ); + } + + /** + * Return the valid authentication response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $result + * @return mixed + * @throws \Pusher\PusherException + */ + public function validAuthenticationResponse($request, $result) + { + if (Str::startsWith($request->channel_name, 'private')) { + return $this->decodePusherResponse( + $request, $this->pusher->socket_auth($request->channel_name, $request->socket_id) + ); + } + + $channelName = $this->normalizeChannelName($request->channel_name); + + return $this->decodePusherResponse( + $request, + $this->pusher->presence_auth( + $request->channel_name, $request->socket_id, + $this->retrieveUser($request, $channelName)->getAuthIdentifier(), $result + ) + ); + } + + /** + * Decode the given Pusher response. + * + * @param \Illuminate\Http\Request $request + * @param mixed $response + * @return array + */ + protected function decodePusherResponse($request, $response) + { + if (! $request->input('callback', false)) { + return json_decode($response, true); + } + + return response()->json(json_decode($response, true)) + ->withCallback($request->callback); + } + + /** + * Broadcast the given event. + * + * @param array $channels + * @param string $event + * @param array $payload + * @return void + */ + public function broadcast(array $channels, $event, array $payload = []) + { + $connection = $this->redis->connection($this->connection); + + $payload = json_encode([ + 'appId' => $this->appId, + 'event' => $event, + 'data' => $payload, + 'socket' => Arr::pull($payload, 'socket'), + ]); + + foreach ($this->formatChannels($channels) as $channel) { + $connection->publish("{$this->appId}:$channel", $payload); + } + } +} diff --git a/src/PubSub/Drivers/LocalClient.php b/src/PubSub/Drivers/LocalClient.php new file mode 100644 index 0000000000..9d5c5e20f7 --- /dev/null +++ b/src/PubSub/Drivers/LocalClient.php @@ -0,0 +1,138 @@ +channelData["$appId:$channel"][$socketId] = $data; + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + unset($this->channelData["$appId:$channel"][$socketId]); + if (empty($this->channelData["$appId:$channel"])) { + unset($this->channelData["$appId:$channel"]); + } + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel) : PromiseInterface + { + $members = $this->channelData["$appId:$channel"] ?? []; + + // The data is expected as objects, so we need to JSON decode + $members = array_map(function ($user) { + return json_decode($user); + }, $members); + + return new FulfilledPromise($members); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames) : PromiseInterface + { + $results = []; + + // Count the number of users per channel + foreach ($channelNames as $channel) { + $results[$channel] = isset($this->channelData["$appId:$channel"]) + ? count($this->channelData["$appId:$channel"]) + : 0; + } + + return new FulfilledPromise($results); + } +} diff --git a/src/PubSub/Drivers/RedisClient.php b/src/PubSub/Drivers/RedisClient.php new file mode 100644 index 0000000000..2c8d916d9e --- /dev/null +++ b/src/PubSub/Drivers/RedisClient.php @@ -0,0 +1,276 @@ +serverId = Str::uuid()->toString(); + } + + /** + * Boot the RedisClient, initializing the connections. + * + * @param LoopInterface $loop + * @return ReplicationInterface + */ + public function boot(LoopInterface $loop): ReplicationInterface + { + $this->loop = $loop; + + $connectionUri = $this->getConnectionUri(); + $factory = new Factory($this->loop); + + $this->publishClient = $factory->createLazyClient($connectionUri); + $this->subscribeClient = $factory->createLazyClient($connectionUri); + + $this->subscribeClient->on('message', function ($channel, $payload) { + $this->onMessage($channel, $payload); + }); + + return $this; + } + + /** + * Handle a message received from Redis on a specific channel. + * + * @param string $redisChannel + * @param string $payload + */ + protected function onMessage(string $redisChannel, string $payload) + { + $payload = json_decode($payload); + + // Ignore messages sent by ourselves + if (isset($payload->serverId) && $this->serverId === $payload->serverId) { + return; + } + + // Pull out the app ID. See RedisPusherBroadcaster + $appId = $payload->appId; + + // We need to put the channel name in the payload. + // We strip the app ID from the channel name, websocket clients + // expect the channel name to not include the app ID. + $payload->channel = Str::after($redisChannel, "$appId:"); + + /* @var ChannelManager $channelManager */ + $channelManager = app(ChannelManager::class); + + // Load the Channel instance, if any + $channel = $channelManager->find($appId, $payload->channel); + + // If no channel is found, none of our connections want to + // receive this message, so we ignore it. + if (! $channel) { + return; + } + + $socket = $payload->socket ?? null; + + // Remove fields intended for internal use from the payload + unset($payload->socket); + unset($payload->serverId); + unset($payload->appId); + + // Push the message out to connected websocket clients + $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false); + } + + /** + * Subscribe to a channel on behalf of websocket user. + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function subscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels["$appId:$channel"])) { + // We're not subscribed to the channel yet, subscribe and set the count to 1 + $this->subscribeClient->__call('subscribe', ["$appId:$channel"]); + $this->subscribedChannels["$appId:$channel"] = 1; + } else { + // Increment the subscribe count if we've already subscribed + $this->subscribedChannels["$appId:$channel"]++; + } + + return true; + } + + /** + * Unsubscribe from a channel on behalf of a websocket user. + * + * @param string $appId + * @param string $channel + * @return bool + */ + public function unsubscribe(string $appId, string $channel): bool + { + if (! isset($this->subscribedChannels["$appId:$channel"])) { + return false; + } + + // Decrement the subscription count for this channel + $this->subscribedChannels["$appId:$channel"]--; + + // If we no longer have subscriptions to that channel, unsubscribe + if ($this->subscribedChannels["$appId:$channel"] < 1) { + $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]); + unset($this->subscribedChannels["$appId:$channel"]); + } + + return true; + } + + /** + * Publish a message to a channel on behalf of a websocket user. + * + * @param string $appId + * @param string $channel + * @param stdClass $payload + * @return bool + */ + public function publish(string $appId, string $channel, stdClass $payload): bool + { + $payload->appId = $appId; + $payload->serverId = $this->serverId; + + $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]); + + return true; + } + + /** + * Add a member to a channel. To be called when they have + * subscribed to the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + * @param string $data + */ + public function joinChannel(string $appId, string $channel, string $socketId, string $data) + { + $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]); + } + + /** + * Remove a member from the channel. To be called when they have + * unsubscribed from the channel. + * + * @param string $appId + * @param string $channel + * @param string $socketId + */ + public function leaveChannel(string $appId, string $channel, string $socketId) + { + $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]); + } + + /** + * Retrieve the full information about the members in a presence channel. + * + * @param string $appId + * @param string $channel + * @return PromiseInterface + */ + public function channelMembers(string $appId, string $channel): PromiseInterface + { + return $this->publishClient->__call('hgetall', ["$appId:$channel"]) + ->then(function ($members) { + // The data is expected as objects, so we need to JSON decode + return array_map(function ($user) { + return json_decode($user); + }, $members); + }); + } + + /** + * Get the amount of users subscribed for each presence channel. + * + * @param string $appId + * @param array $channelNames + * @return PromiseInterface + */ + public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface + { + $this->publishClient->__call('multi', []); + + foreach ($channelNames as $channel) { + $this->publishClient->__call('hlen', ["$appId:$channel"]); + } + + return $this->publishClient->__call('exec', []) + ->then(function ($data) use ($channelNames) { + return array_combine($channelNames, $data); + }); + } + + /** + * Build the Redis connection URL from Laravel database config. + * + * @return string + */ + protected function getConnectionUri() + { + $name = config('websockets.replication.connection') ?? 'default'; + $config = config("database.redis.$name"); + $host = $config['host']; + $port = $config['port'] ? (':'.$config['port']) : ':6379'; + + $query = []; + if ($config['password']) { + $query['password'] = $config['password']; + } + if ($config['database']) { + $query['database'] = $config['database']; + } + $query = http_build_query($query); + + return "redis://$host$port".($query ? '?'.$query : ''); + } +} diff --git a/src/PubSub/ReplicationInterface.php b/src/PubSub/ReplicationInterface.php new file mode 100644 index 0000000000..3e120af528 --- /dev/null +++ b/src/PubSub/ReplicationInterface.php @@ -0,0 +1,85 @@ +createWebSocketsServer($action) diff --git a/src/WebSockets/Channels/Channel.php b/src/WebSockets/Channels/Channel.php index 0c9ca49a23..75a9791962 100644 --- a/src/WebSockets/Channels/Channel.php +++ b/src/WebSockets/Channels/Channel.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; @@ -13,12 +14,21 @@ class Channel /** @var string */ protected $channelName; + /** @var ReplicationInterface */ + protected $replication; + /** @var \Ratchet\ConnectionInterface[] */ protected $subscribedConnections = []; public function __construct(string $channelName) { $this->channelName = $channelName; + $this->replication = app(ReplicationInterface::class); + } + + public function getChannelName(): string + { + return $this->channelName; } public function hasConnections(): bool @@ -31,6 +41,9 @@ public function getSubscribedConnections(): array return $this->subscribedConnections; } + /** + * @throws InvalidSignature + */ protected function verifySignature(ConnectionInterface $connection, stdClass $payload) { $signature = "{$connection->socketId}:{$this->channelName}"; @@ -39,18 +52,24 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa $signature .= ":{$payload->channel_data}"; } - if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) { + if (! hash_equals( + hash_hmac('sha256', $signature, $connection->app->secret), + Str::after($payload->auth, ':')) + ) { throw new InvalidSignature(); } } - /* + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->saveConnection($connection); + // Subscribe to broadcasted messages from the pub/sub backend + $this->replication->subscribe($connection->app->id, $this->channelName); + $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->channelName, @@ -61,6 +80,9 @@ public function unsubscribe(ConnectionInterface $connection) { unset($this->subscribedConnections[$connection->socketId]); + // Unsubscribe from the pub/sub backend + $this->replication->unsubscribe($connection->app->id, $this->channelName); + if (! $this->hasConnections()) { DashboardLogger::vacated($connection, $this->channelName); } @@ -88,13 +110,26 @@ public function broadcast($payload) public function broadcastToOthers(ConnectionInterface $connection, $payload) { - $this->broadcastToEveryoneExcept($payload, $connection->socketId); + $this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id); } - public function broadcastToEveryoneExcept($payload, ?string $socketId = null) + public function broadcastToEveryoneExcept($payload, ?string $socketId, string $appId, bool $publish = true) { + // Also broadcast via the other websocket server instances. + // This is set false in the Redis client because we don't want to cause a loop + // in this case. If this came from TriggerEventController, then we still want + // to publish to get the message out to other server instances. + if ($publish) { + $this->replication->publish($appId, $this->channelName, $payload); + } + + // Performance optimization, if we don't have a socket ID, + // then we avoid running the if condition in the foreach loop below + // by calling broadcast() instead. if (is_null($socketId)) { - return $this->broadcast($payload); + $this->broadcast($payload); + + return; } foreach ($this->subscribedConnections as $connection) { @@ -104,7 +139,7 @@ public function broadcastToEveryoneExcept($payload, ?string $socketId = null) } } - public function toArray(): array + public function toArray(string $appId = null) { return [ 'occupied' => count($this->subscribedConnections) > 0, diff --git a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php index 8d0d91c827..f9c6c20c42 100644 --- a/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php +++ b/src/WebSockets/Channels/ChannelManagers/ArrayChannelManager.php @@ -15,7 +15,7 @@ class ArrayChannelManager implements ChannelManager /** @var string */ protected $appId; - /** @var array */ + /** @var Channel[][] */ protected $channels = []; public function findOrCreate(string $appId, string $channelName): Channel diff --git a/src/WebSockets/Channels/PresenceChannel.php b/src/WebSockets/Channels/PresenceChannel.php index 9bbe1ec231..94a1426097 100644 --- a/src/WebSockets/Channels/PresenceChannel.php +++ b/src/WebSockets/Channels/PresenceChannel.php @@ -2,20 +2,42 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; use Ratchet\ConnectionInterface; +use React\Promise\PromiseInterface; use stdClass; class PresenceChannel extends Channel { + /** + * Data for the users connected to this channel. + * + * Note: If replication is enabled, this will only contain entries + * for the users directly connected to this server instance. Requests + * for data for all users in the channel should be routed through + * ReplicationInterface. + * + * @var string[] + */ protected $users = []; - public function getUsers(): array + /** + * @param string $appId + * @return PromiseInterface + */ + public function getUsers(string $appId) { - return $this->users; + // Get the members list from the replication backend + return $this->replication + ->channelMembers($appId, $this->channelName); } - /* + /** * @link https://pusher.com/docs/pusher_protocol#presence-channel-events + * + * @param ConnectionInterface $connection + * @param stdClass $payload + * @throws InvalidSignature */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { @@ -26,14 +48,29 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload) $channelData = json_decode($payload->channel_data); $this->users[$connection->socketId] = $channelData; - // Send the success event - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->channelName, - 'data' => json_encode($this->getChannelData()), - ])); - - $this->broadcastToOthers($connection, [ + // Add the connection as a member of the channel + $this->replication + ->joinChannel( + $connection->app->id, + $this->channelName, + $connection->socketId, + json_encode($channelData) + ); + + // We need to pull the channel data from the replication backend, + // otherwise we won't be sending the full details of the channel + $this->replication + ->channelMembers($connection->app->id, $this->channelName) + ->then(function ($users) use ($connection) { + // Send the success event + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->channelName, + 'data' => json_encode($this->getChannelData($users)), + ])); + }); + + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_added', 'channel' => $this->channelName, 'data' => json_encode($channelData), @@ -48,7 +85,15 @@ public function unsubscribe(ConnectionInterface $connection) return; } - $this->broadcastToOthers($connection, [ + // Remove the connection as a member of the channel + $this->replication + ->leaveChannel( + $connection->app->id, + $this->channelName, + $connection->socketId + ); + + $this->broadcastToOthers($connection, (object) [ 'event' => 'pusher_internal:member_removed', 'channel' => $this->channelName, 'data' => json_encode([ @@ -59,38 +104,46 @@ public function unsubscribe(ConnectionInterface $connection) unset($this->users[$connection->socketId]); } - protected function getChannelData(): array + /** + * @param string|null $appId + * @return PromiseInterface + */ + public function toArray(string $appId = null) + { + return $this->replication + ->channelMembers($appId, $this->channelName) + ->then(function ($users) { + return array_merge(parent::toArray(), [ + 'user_count' => count($users), + ]); + }); + } + + protected function getChannelData(array $users): array { return [ 'presence' => [ - 'ids' => $this->getUserIds(), - 'hash' => $this->getHash(), - 'count' => count($this->users), + 'ids' => $this->getUserIds($users), + 'hash' => $this->getHash($users), + 'count' => count($users), ], ]; } - public function toArray(): array - { - return array_merge(parent::toArray(), [ - 'user_count' => count($this->users), - ]); - } - - protected function getUserIds(): array + protected function getUserIds(array $users): array { $userIds = array_map(function ($channelData) { return (string) $channelData->user_id; - }, $this->users); + }, $users); return array_values($userIds); } - protected function getHash(): array + protected function getHash(array $users): array { $hash = []; - foreach ($this->users as $socketId => $channelData) { + foreach ($users as $socketId => $channelData) { $hash[$channelData->user_id] = $channelData->user_info; } diff --git a/src/WebSockets/Channels/PrivateChannel.php b/src/WebSockets/Channels/PrivateChannel.php index a4d40c356e..d68b90b652 100644 --- a/src/WebSockets/Channels/PrivateChannel.php +++ b/src/WebSockets/Channels/PrivateChannel.php @@ -2,11 +2,15 @@ namespace BeyondCode\LaravelWebSockets\WebSockets\Channels; +use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\InvalidSignature; use Ratchet\ConnectionInterface; use stdClass; class PrivateChannel extends Channel { + /** + * @throws InvalidSignature + */ public function subscribe(ConnectionInterface $connection, stdClass $payload) { $this->verifySignature($connection, $payload); diff --git a/src/WebSocketsServiceProvider.php b/src/WebSocketsServiceProvider.php index c68379a55d..bc107f16ad 100644 --- a/src/WebSocketsServiceProvider.php +++ b/src/WebSocketsServiceProvider.php @@ -8,15 +8,22 @@ use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\SendMessage; use BeyondCode\LaravelWebSockets\Dashboard\Http\Controllers\ShowDashboard; use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard; +use BeyondCode\LaravelWebSockets\PubSub\Broadcasters\RedisPusherBroadcaster; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\LocalClient; +use BeyondCode\LaravelWebSockets\PubSub\Drivers\RedisClient; +use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface; use BeyondCode\LaravelWebSockets\Server\Router; use BeyondCode\LaravelWebSockets\Statistics\Http\Controllers\WebSocketStatisticsEntriesController; use BeyondCode\LaravelWebSockets\Statistics\Http\Middleware\Authorize as AuthorizeStatistics; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager; use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager; +use Pusher\Pusher; +use Psr\Log\LoggerInterface; +use Illuminate\Broadcasting\BroadcastManager; use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Route; -use Illuminate\Support\Facades\Schema; use Illuminate\Support\ServiceProvider; +use Illuminate\Support\Facades\Schema; class WebSocketsServiceProvider extends ServiceProvider { @@ -43,6 +50,41 @@ public function boot() Console\CleanStatistics::class, Console\RestartWebSocketServer::class, ]); + + $this->configurePubSub(); + } + + protected function configurePubSub() + { + if (config('websockets.replication.enabled') !== true || config('websockets.replication.driver') !== 'redis') { + $this->app->singleton(ReplicationInterface::class, function () { + return new LocalClient(); + }); + + return; + } + + $this->app->singleton(ReplicationInterface::class, function () { + return (new RedisClient())->boot($this->loop); + }); + + $this->app->get(BroadcastManager::class)->extend('redis-pusher', function ($app, array $config) { + $pusher = new Pusher( + $config['key'], $config['secret'], + $config['app_id'], $config['options'] ?? [] + ); + + if ($config['log'] ?? false) { + $pusher->setLogger($this->app->make(LoggerInterface::class)); + } + + return new RedisPusherBroadcaster( + $pusher, + $config['app_id'], + $this->app->make('redis'), + $config['connection'] ?? null + ); + }); } public function register() @@ -60,7 +102,7 @@ public function register() }); $this->app->singleton(AppProvider::class, function () { - return app(config('websockets.managers.app')); + return $this->app->make(config('websockets.managers.app')); }); } @@ -69,7 +111,7 @@ protected function registerRoutes() Route::prefix(config('websockets.dashboard.path'))->group(function () { Route::middleware(config('websockets.dashboard.middleware', [AuthorizeDashboard::class]))->group(function () { Route::get('/', ShowDashboard::class); - Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']); + Route::get('/api/{appId}/statistics', [DashboardApiController::class, 'getStatistics']); Route::post('auth', AuthenticateDashboard::class); Route::post('event', SendMessage::class); }); @@ -85,7 +127,7 @@ protected function registerRoutes() protected function registerDashboardGate() { Gate::define('viewWebSocketsDashboard', function ($user = null) { - return app()->environment('local'); + return $this->app->environment('local'); }); return $this; diff --git a/tests/Channels/ChannelReplicationTest.php b/tests/Channels/ChannelReplicationTest.php new file mode 100644 index 0000000000..f8e08727f2 --- /dev/null +++ b/tests/Channels/ChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/ChannelTest.php b/tests/Channels/ChannelTest.php index 56c6a7bc44..a16a83d200 100644 --- a/tests/Channels/ChannelTest.php +++ b/tests/Channels/ChannelTest.php @@ -123,7 +123,7 @@ public function channels_can_broadcast_messages_to_all_connections_except_the_gi $channel = $this->getChannel($connection1, 'test-channel'); - $channel->broadcastToOthers($connection1, [ + $channel->broadcastToOthers($connection1, (object) [ 'event' => 'broadcasted-event', 'channel' => 'test-channel', ]); diff --git a/tests/Channels/PresenceChannelReplicationTest.php b/tests/Channels/PresenceChannelReplicationTest.php new file mode 100644 index 0000000000..70702715b0 --- /dev/null +++ b/tests/Channels/PresenceChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Channels/PresenceChannelTest.php b/tests/Channels/PresenceChannelTest.php index c2ae4a48c8..3f3bb0ad87 100644 --- a/tests/Channels/PresenceChannelTest.php +++ b/tests/Channels/PresenceChannelTest.php @@ -59,4 +59,75 @@ public function clients_with_valid_auth_signatures_can_join_presence_channels() 'channel' => 'presence-channel', ]); } + + /** @test */ + public function clients_with_valid_auth_signatures_can_leave_presence_channels() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:subscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + 'channel_data' => json_encode($channelData), + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $connection->assertSentEvent('pusher_internal:subscription_succeeded', [ + 'channel' => 'presence-channel', + ]); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + } + + /** @test */ + public function clients_with_valid_auth_signatures_cannot_leave_channels_they_are_not_in() + { + $connection = $this->getWebSocketConnection(); + + $this->pusherServer->onOpen($connection); + + $channelData = [ + 'user_id' => 1, + 'user_info' => [ + 'name' => 'Marcel', + ], + ]; + + $signature = "{$connection->socketId}:presence-channel:".json_encode($channelData); + + $message = new Message(json_encode([ + 'event' => 'pusher:unsubscribe', + 'data' => [ + 'auth' => $connection->app->key.':'.hash_hmac('sha256', $signature, $connection->app->secret), + 'channel' => 'presence-channel', + ], + ])); + + $this->pusherServer->onMessage($connection, $message); + + $this->markTestAsPassed(); + } } diff --git a/tests/ClientProviders/AppTest.php b/tests/ClientProviders/AppTest.php index 72e8e9d5bb..683a805c92 100644 --- a/tests/ClientProviders/AppTest.php +++ b/tests/ClientProviders/AppTest.php @@ -11,7 +11,7 @@ class AppTest extends TestCase /** @test */ public function it_can_create_a_client() { - new App(1, 'appKey', 'appSecret', 'new'); + new App(1, 'appKey', 'appSecret'); $this->markTestAsPassed(); } @@ -21,7 +21,7 @@ public function it_will_not_accept_an_empty_appKey() { $this->expectException(InvalidApp::class); - new App(1, '', 'appSecret', 'new'); + new App(1, '', 'appSecret'); } /** @test */ @@ -29,6 +29,6 @@ public function it_will_not_accept_an_empty_appSecret() { $this->expectException(InvalidApp::class); - new App(1, 'appKey', '', 'new'); + new App(1, 'appKey', ''); } } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 89db69413a..81f4ac0e65 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -46,7 +46,7 @@ public function successful_connections_have_the_app_attached() $this->pusherServer->onOpen($connection); $this->assertInstanceOf(App::class, $connection->app); - $this->assertSame(1234, $connection->app->id); + $this->assertSame('1234', $connection->app->id); $this->assertSame('TestKey', $connection->app->key); $this->assertSame('TestSecret', $connection->app->secret); $this->assertSame('Test App', $connection->app->name); diff --git a/tests/HttpApi/FetchChannelReplicationTest.php b/tests/HttpApi/FetchChannelReplicationTest.php new file mode 100644 index 0000000000..84f4c51a3a --- /dev/null +++ b/tests/HttpApi/FetchChannelReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchChannelTest.php b/tests/HttpApi/FetchChannelTest.php index 262f93cffe..ed6846c46f 100644 --- a/tests/HttpApi/FetchChannelTest.php +++ b/tests/HttpApi/FetchChannelTest.php @@ -66,6 +66,38 @@ public function it_returns_the_channel_information() ], json_decode($response->getContent(), true)); } + /** @test */ + public function it_returns_presence_channel_information() + { + $this->joinPresenceChannel('presence-channel'); + $this->joinPresenceChannel('presence-channel'); + + $connection = new Connection(); + + $requestPath = '/apps/1234/channel/my-channel'; + $routeParams = [ + 'appId' => '1234', + 'channelName' => 'presence-channel', + ]; + + $queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath); + + $request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams)); + + $controller = app(FetchChannelController::class); + + $controller->onOpen($connection, $request); + + /** @var JsonResponse $response */ + $response = array_pop($connection->sentRawData); + + $this->assertSame([ + 'occupied' => true, + 'subscription_count' => 2, + 'user_count' => 2, + ], json_decode($response->getContent(), true)); + } + /** @test */ public function it_returns_404_for_invalid_channels() { diff --git a/tests/HttpApi/FetchChannelsReplicationTest.php b/tests/HttpApi/FetchChannelsReplicationTest.php new file mode 100644 index 0000000000..24eb9b419a --- /dev/null +++ b/tests/HttpApi/FetchChannelsReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/HttpApi/FetchUsersReplicationTest.php b/tests/HttpApi/FetchUsersReplicationTest.php new file mode 100644 index 0000000000..2d959a8ceb --- /dev/null +++ b/tests/HttpApi/FetchUsersReplicationTest.php @@ -0,0 +1,17 @@ +setupReplication(); + } +} diff --git a/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php b/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php index 14e4629d62..421795c5e7 100644 --- a/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php +++ b/tests/Statistics/Controllers/WebSocketsStatisticsControllerTest.php @@ -22,16 +22,21 @@ public function it_can_store_statistics() $this->assertCount(1, $entries); - $this->assertArrayHasKey('app_id', $entries->first()->attributesToArray()); + $actual = $entries->first()->attributesToArray(); + + foreach ($this->payload() as $key => $value) { + $this->assertArrayHasKey($key, $actual); + $this->assertSame($value, $actual[$key]); + } } protected function payload(): array { return [ 'app_id' => config('websockets.apps.0.id'), - 'peak_connection_count' => 1, - 'websocket_message_count' => 2, - 'api_message_count' => 3, + 'peak_connection_count' => '1', + 'websocket_message_count' => '2', + 'api_message_count' => '3', ]; } } diff --git a/tests/TestCase.php b/tests/TestCase.php index e4b4023460..b209783816 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -46,9 +46,10 @@ protected function getEnvironmentSetUp($app) $app['config']->set('websockets.apps', [ [ 'name' => 'Test App', - 'id' => 1234, + 'id' => '1234', 'key' => 'TestKey', 'secret' => 'TestSecret', + 'host' => 'localhost', 'capacity' => null, 'enable_client_messages' => false, 'enable_statistics' => true, diff --git a/tests/TestsReplication.php b/tests/TestsReplication.php new file mode 100644 index 0000000000..e179ea0370 --- /dev/null +++ b/tests/TestsReplication.php @@ -0,0 +1,22 @@ +singleton(ReplicationInterface::class, function () { + return new LocalClient(); + }); + + Config::set([ + 'websockets.replication.enabled' => true, + 'websockets.replication.driver' => 'redis', + ]); + } +}