Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Redis as a replication backend for scalability #140

Merged
merged 32 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b584d0c
Update with pub sub replication and redis driver
snellingio Dec 17, 2018
c203d24
Clean up some typos, add some type hints, StyleCI fixes
francislavoie Mar 24, 2019
e454f53
Initial implementation of Redis as a pub/sub backend, WIP
francislavoie Mar 25, 2019
668cd29
Fix style issues reported by StyleCI
francislavoie Mar 25, 2019
eca8c7b
Scope pub/sub channels in Redis by appId to avoid crosstalk between apps
francislavoie Mar 29, 2019
87c00fb
app() -> $this->laravel in StartWebSocketServer
francislavoie Mar 29, 2019
4baac7e
Implement presence channel storage in Redis
francislavoie Mar 29, 2019
b7ae9ba
Add tests for replication, fix bugs in the implementation
francislavoie Apr 5, 2019
faf2c75
Fix redis-pusher broadcast driver, wrong params for extend() callable
francislavoie Apr 22, 2019
ed55034
Fix mistake during rebase
francislavoie May 15, 2019
d7c30f3
cleanup & refactor of pubsub code
anthonyvancauwenberghe Jul 28, 2019
3c909b9
remove predis from require-dev
anthonyvancauwenberghe Jul 28, 2019
b5fcc13
remove redis host
anthonyvancauwenberghe Jul 28, 2019
6e68d3d
one line var doc
anthonyvancauwenberghe Jul 28, 2019
d43ac82
remove test code
anthonyvancauwenberghe Jul 28, 2019
11e1f89
Merge pull request #1 from deviouspk/analysis-z3nD5L
anthonyvancauwenberghe Jul 28, 2019
060b986
resolve app from local variables
anthonyvancauwenberghe Jul 28, 2019
f2b3347
resolve app from local variables in console
anthonyvancauwenberghe Jul 28, 2019
373b993
rename emptyclient to localclient
anthonyvancauwenberghe Jul 28, 2019
00e8f3e
Add channel storage to LocalDriver to simplify PresenceChannel logic
francislavoie Jul 29, 2019
990a075
Avoid calls to app()
francislavoie Jul 29, 2019
091f56e
Simplify controller logic due to PresenceChannel logic changes
francislavoie Jul 29, 2019
ef86f86
Attempt at making TriggerEventController also publish to other servers
francislavoie Jul 29, 2019
e259cac
Remove duplicate client mock client, simplify test trait
francislavoie Sep 3, 2019
5979f63
StyleCI fixes
francislavoie Sep 3, 2019
e3c0cea
Fix tests failing on older versions of Laravel
francislavoie Sep 3, 2019
db58378
Fix test warnings due to usage of deprecated assertArraySubset()
francislavoie Sep 23, 2019
aa1c11e
Merge branch 'master' into redis-replication
rennokki Aug 13, 2020
6e85197
Update WebSocketsServiceProvider.php
rennokki Aug 13, 2020
1bfbbdb
Merge branch '2.x' into redis-replication
rennokki Aug 13, 2020
3a0bcea
wip
rennokki Aug 13, 2020
ce84e8c
wip
rennokki Aug 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ build
composer.lock
vendor
coverage
.phpunit.result.cache
.phpunit.result.cache
.idea/
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"ext-json": "*",
"cboden/ratchet": "^0.4.1",
"clue/buzz-react": "^2.5",
"clue/redis-react": "^2.3",
"facade/ignition-contracts": "^1.0",
"guzzlehttp/psr7": "^1.5",
"illuminate/broadcasting": "5.8.*|^6.0|^7.0",
Expand Down
28 changes: 28 additions & 0 deletions config/websockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,34 @@

],

/*
|--------------------------------------------------------------------------
| Broadcasting Replication
|--------------------------------------------------------------------------
|
| You can enable replication to publish and subscribe to
| messages across the driver.
|
| By default, it is disabled, but you can configure it to use drivers
| like Redis to ensure connection between multiple instances of
| WebSocket servers.
|
*/

'replication' => [

'enabled' => false,

'driver' => 'redis',

'redis' => [

'connection' => 'default',

],

],

'statistics' => [

/*
Expand Down
10 changes: 5 additions & 5 deletions src/Apps/ConfigAppProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function all(): array
{
return $this->apps
->map(function (array $appAttributes) {
return $this->instanciate($appAttributes);
return $this->instantiate($appAttributes);
})
->toArray();
}
Expand All @@ -30,7 +30,7 @@ public function findById($appId): ?App
->apps
->firstWhere('id', $appId);

return $this->instanciate($appAttributes);
return $this->instantiate($appAttributes);
}

public function findByKey(string $appKey): ?App
Expand All @@ -39,7 +39,7 @@ public function findByKey(string $appKey): ?App
->apps
->firstWhere('key', $appKey);

return $this->instanciate($appAttributes);
return $this->instantiate($appAttributes);
}

public function findBySecret(string $appSecret): ?App
Expand All @@ -48,10 +48,10 @@ public function findBySecret(string $appSecret): ?App
->apps
->firstWhere('secret', $appSecret);

return $this->instanciate($appAttributes);
return $this->instantiate($appAttributes);
}

protected function instanciate(?array $appAttributes): ?App
protected function instantiate(?array $appAttributes): ?App
{
if (! $appAttributes) {
return null;
Expand Down
17 changes: 13 additions & 4 deletions src/Console/StartWebSocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger;
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger;
Expand Down Expand Up @@ -49,6 +50,7 @@ public function handle()
->configureRestartTimer()
->registerEchoRoutes()
->registerCustomRoutes()
->configurePubSubReplication()
->startWebSocketServer();
}

Expand All @@ -61,7 +63,7 @@ protected function configureStatisticsLogger()

$browser = new Browser($this->loop, $connector);

app()->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
$this->laravel->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
$class = config('websockets.statistics.logger', \BeyondCode\LaravelWebSockets\Statistics\Logger::class);

return new $class(app(ChannelManager::class), $browser);
Expand All @@ -76,7 +78,7 @@ protected function configureStatisticsLogger()

protected function configureHttpLogger()
{
app()->singleton(HttpLogger::class, function () {
$this->laravel->singleton(HttpLogger::class, function () {
return (new HttpLogger($this->output))
->enable($this->option('debug') ?: config('app.debug'))
->verbose($this->output->isVerbose());
Expand All @@ -87,7 +89,7 @@ protected function configureHttpLogger()

protected function configureMessageLogger()
{
app()->singleton(WebsocketsLogger::class, function () {
$this->laravel->singleton(WebsocketsLogger::class, function () {
return (new WebsocketsLogger($this->output))
->enable($this->option('debug') ?: config('app.debug'))
->verbose($this->output->isVerbose());
Expand All @@ -98,7 +100,7 @@ protected function configureMessageLogger()

protected function configureConnectionLogger()
{
app()->bind(ConnectionLogger::class, function () {
$this->laravel->bind(ConnectionLogger::class, function () {
return (new ConnectionLogger($this->output))
->enable(config('app.debug'))
->verbose($this->output->isVerbose());
Expand Down Expand Up @@ -151,6 +153,13 @@ protected function startWebSocketServer()
->run();
}

protected function configurePubSubReplication()
{
$this->laravel->get(ReplicationInterface::class)->boot($this->loop);

return $this;
}

protected function getDnsResolver(): ResolverInterface
{
if (! config('websockets.statistics.perform_dns_lookup')) {
Expand Down
5 changes: 4 additions & 1 deletion src/Facades/StatisticsLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface;
use Illuminate\Support\Facades\Facade;

/** @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger */
/**
* @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
* @mixin \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
*/
class StatisticsLogger extends Facade
{
protected static function getFacadeAccessor()
Expand Down
5 changes: 4 additions & 1 deletion src/Facades/WebSocketsRouter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

use Illuminate\Support\Facades\Facade;

/** @see \BeyondCode\LaravelWebSockets\Server\Router */
/**
* @see \BeyondCode\LaravelWebSockets\Server\Router
* @mixin \BeyondCode\LaravelWebSockets\Server\Router
*/
class WebSocketsRouter extends Facade
{
protected static function getFacadeAccessor()
Expand Down
67 changes: 47 additions & 20 deletions src/HttpApi/Controllers/Controller.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Pusher\Pusher;
use Ratchet\ConnectionInterface;
use Ratchet\Http\HttpServerInterface;
use React\Promise\PromiseInterface;
use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory;
use Symfony\Component\HttpKernel\Exception\HttpException;

Expand All @@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface
/** @var int */
protected $contentLength;

/** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */
/** @var ChannelManager */
protected $channelManager;

public function __construct(ChannelManager $channelManager)
Expand All @@ -46,7 +47,11 @@ public function onOpen(ConnectionInterface $connection, RequestInterface $reques

$this->requestBuffer = (string) $request->getBody();

$this->checkContentLength($connection);
if (! $this->verifyContentLength()) {
return;
}

$this->handleRequest($connection);
}

protected function findContentLength(array $headers): int
Expand All @@ -60,31 +65,53 @@ public function onMessage(ConnectionInterface $from, $msg)
{
$this->requestBuffer .= $msg;

$this->checkContentLength($from);
if (! $this->verifyContentLength()) {
return;
}

$this->handleRequest($from);
}

protected function verifyContentLength()
{
return strlen($this->requestBuffer) === $this->contentLength;
}

protected function checkContentLength(ConnectionInterface $connection)
protected function handleRequest(ConnectionInterface $connection)
{
if (strlen($this->requestBuffer) === $this->contentLength) {
$serverRequest = (new ServerRequest(
$this->request->getMethod(),
$this->request->getUri(),
$this->request->getHeaders(),
$this->requestBuffer,
$this->request->getProtocolVersion()
))->withQueryParams(QueryParameters::create($this->request)->all());
$serverRequest = (new ServerRequest(
$this->request->getMethod(),
$this->request->getUri(),
$this->request->getHeaders(),
$this->requestBuffer,
$this->request->getProtocolVersion()
))->withQueryParams(QueryParameters::create($this->request)->all());

$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));

$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
$this
->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest);

$this
->ensureValidAppId($laravelRequest->appId)
->ensureValidSignature($laravelRequest);
// Invoke the controller action
$response = $this($laravelRequest);

$response = $this($laravelRequest);
// Allow for async IO in the controller action
if ($response instanceof PromiseInterface) {
$response->then(function ($response) use ($connection) {
$this->sendAndClose($connection, $response);
});

$connection->send(JsonResponse::create($response));
$connection->close();
return;
}

$this->sendAndClose($connection, $response);
}

protected function sendAndClose(ConnectionInterface $connection, $response)
{
$connection->send(JsonResponse::create($response));
$connection->close();
}

public function onClose(ConnectionInterface $connection)
Expand Down Expand Up @@ -122,7 +149,7 @@ protected function ensureValidSignature(Request $request)
/*
* The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value.
*
* The `appId`, `appKey` & `channelName` parameters are actually route paramaters and are never supplied by the client.
* The `appId`, `appKey` & `channelName` parameters are actually route parameters and are never supplied by the client.
*/
$params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']);

Expand Down
2 changes: 1 addition & 1 deletion src/HttpApi/Controllers/FetchChannelController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ public function __invoke(Request $request)
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
}

return $channel->toArray();
return $channel->toArray($request->appId);
}
}
44 changes: 35 additions & 9 deletions src/HttpApi/Controllers/FetchChannelsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Symfony\Component\HttpKernel\Exception\HttpException;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;

class FetchChannelsController extends Controller
{
/** @var ReplicationInterface */
protected $replication;

public function __construct(ChannelManager $channelManager, ReplicationInterface $replication)
{
parent::__construct($channelManager);

$this->replication = $replication;
}

public function __invoke(Request $request)
{
$attributes = [];
Expand All @@ -29,15 +42,28 @@ public function __invoke(Request $request)
});
}

return [
'channels' => $channels->map(function ($channel) use ($attributes) {
$info = new \stdClass;
if (in_array('user_count', $attributes)) {
$info->user_count = count($channel->getUsers());
}
// We want to get the channel user count all in one shot when
// using a replication backend rather than doing individual queries.
// To do so, we first collect the list of channel names.
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
return $channel->getChannelName();
})->toArray();

// We ask the replication backend to get us the member count per channel.
// We get $counts back as a key-value array of channel names and their member count.
return $this->replication
->channelMemberCounts($request->appId, $channelNames)
->then(function (array $counts) use ($channels, $attributes) {
return [
'channels' => $channels->map(function (PresenceChannel $channel) use ($counts, $attributes) {
$info = new \stdClass;
if (in_array('user_count', $attributes)) {
$info->user_count = $counts[$channel->getChannelName()];
}

return $info;
})->toArray() ?: new \stdClass,
];
return $info;
})->toArray() ?: new \stdClass,
];
});
}
}
14 changes: 9 additions & 5 deletions src/HttpApi/Controllers/FetchUsersController.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ public function __invoke(Request $request)
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
}

return [
'users' => Collection::make($channel->getUsers())->map(function ($user) {
return ['id' => $user->user_id];
})->values(),
];
return $channel
->getUsers($request->appId)
->then(function (array $users) {
return [
'users' => Collection::make($users)->map(function ($user) {
return ['id' => $user->user_id];
})->values(),
];
});
}
}
2 changes: 1 addition & 1 deletion src/HttpApi/Controllers/TriggerEventController.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __invoke(Request $request)
'channel' => $channelName,
'event' => $request->json()->get('name'),
'data' => $request->json()->get('data'),
], $request->json()->get('socket_id'));
], $request->json()->get('socket_id'), $request->appId);

DashboardLogger::apiMessage(
$request->appId,
Expand Down
Loading