Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 8dc2856

Browse files
authored
Merge pull request #140 from francislavoie/redis-replication
Redis as a replication backend for scalability
2 parents 4a5f354 + ce84e8c commit 8dc2856

35 files changed

+1198
-97
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ build
22
composer.lock
33
vendor
44
coverage
5-
.phpunit.result.cache
5+
.phpunit.result.cache
6+
.idea/

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"ext-json": "*",
2727
"cboden/ratchet": "^0.4.1",
2828
"clue/buzz-react": "^2.5",
29+
"clue/redis-react": "^2.3",
2930
"facade/ignition-contracts": "^1.0",
3031
"guzzlehttp/psr7": "^1.5",
3132
"illuminate/broadcasting": "5.8.*|^6.0|^7.0",

config/websockets.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,34 @@
141141

142142
],
143143

144+
/*
145+
|--------------------------------------------------------------------------
146+
| Broadcasting Replication
147+
|--------------------------------------------------------------------------
148+
|
149+
| You can enable replication to publish and subscribe to
150+
| messages across the driver.
151+
|
152+
| By default, it is disabled, but you can configure it to use drivers
153+
| like Redis to ensure connection between multiple instances of
154+
| WebSocket servers.
155+
|
156+
*/
157+
158+
'replication' => [
159+
160+
'enabled' => false,
161+
162+
'driver' => 'redis',
163+
164+
'redis' => [
165+
166+
'connection' => 'default',
167+
168+
],
169+
170+
],
171+
144172
'statistics' => [
145173

146174
/*

src/Apps/ConfigAppProvider.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public function all(): array
1919
{
2020
return $this->apps
2121
->map(function (array $appAttributes) {
22-
return $this->instanciate($appAttributes);
22+
return $this->instantiate($appAttributes);
2323
})
2424
->toArray();
2525
}
@@ -30,7 +30,7 @@ public function findById($appId): ?App
3030
->apps
3131
->firstWhere('id', $appId);
3232

33-
return $this->instanciate($appAttributes);
33+
return $this->instantiate($appAttributes);
3434
}
3535

3636
public function findByKey(string $appKey): ?App
@@ -39,7 +39,7 @@ public function findByKey(string $appKey): ?App
3939
->apps
4040
->firstWhere('key', $appKey);
4141

42-
return $this->instanciate($appAttributes);
42+
return $this->instantiate($appAttributes);
4343
}
4444

4545
public function findBySecret(string $appSecret): ?App
@@ -48,10 +48,10 @@ public function findBySecret(string $appSecret): ?App
4848
->apps
4949
->firstWhere('secret', $appSecret);
5050

51-
return $this->instanciate($appAttributes);
51+
return $this->instantiate($appAttributes);
5252
}
5353

54-
protected function instanciate(?array $appAttributes): ?App
54+
protected function instantiate(?array $appAttributes): ?App
5555
{
5656
if (! $appAttributes) {
5757
return null;

src/Console/StartWebSocketServer.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
66
use BeyondCode\LaravelWebSockets\Facades\WebSocketsRouter;
7+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
78
use BeyondCode\LaravelWebSockets\Server\Logger\ConnectionLogger;
89
use BeyondCode\LaravelWebSockets\Server\Logger\HttpLogger;
910
use BeyondCode\LaravelWebSockets\Server\Logger\WebsocketsLogger;
@@ -49,6 +50,7 @@ public function handle()
4950
->configureRestartTimer()
5051
->registerEchoRoutes()
5152
->registerCustomRoutes()
53+
->configurePubSubReplication()
5254
->startWebSocketServer();
5355
}
5456

@@ -61,7 +63,7 @@ protected function configureStatisticsLogger()
6163

6264
$browser = new Browser($this->loop, $connector);
6365

64-
app()->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
66+
$this->laravel->singleton(StatisticsLoggerInterface::class, function () use ($browser) {
6567
$class = config('websockets.statistics.logger', \BeyondCode\LaravelWebSockets\Statistics\Logger::class);
6668

6769
return new $class(app(ChannelManager::class), $browser);
@@ -76,7 +78,7 @@ protected function configureStatisticsLogger()
7678

7779
protected function configureHttpLogger()
7880
{
79-
app()->singleton(HttpLogger::class, function () {
81+
$this->laravel->singleton(HttpLogger::class, function () {
8082
return (new HttpLogger($this->output))
8183
->enable($this->option('debug') ?: config('app.debug'))
8284
->verbose($this->output->isVerbose());
@@ -87,7 +89,7 @@ protected function configureHttpLogger()
8789

8890
protected function configureMessageLogger()
8991
{
90-
app()->singleton(WebsocketsLogger::class, function () {
92+
$this->laravel->singleton(WebsocketsLogger::class, function () {
9193
return (new WebsocketsLogger($this->output))
9294
->enable($this->option('debug') ?: config('app.debug'))
9395
->verbose($this->output->isVerbose());
@@ -98,7 +100,7 @@ protected function configureMessageLogger()
98100

99101
protected function configureConnectionLogger()
100102
{
101-
app()->bind(ConnectionLogger::class, function () {
103+
$this->laravel->bind(ConnectionLogger::class, function () {
102104
return (new ConnectionLogger($this->output))
103105
->enable(config('app.debug'))
104106
->verbose($this->output->isVerbose());
@@ -151,6 +153,13 @@ protected function startWebSocketServer()
151153
->run();
152154
}
153155

156+
protected function configurePubSubReplication()
157+
{
158+
$this->laravel->get(ReplicationInterface::class)->boot($this->loop);
159+
160+
return $this;
161+
}
162+
154163
protected function getDnsResolver(): ResolverInterface
155164
{
156165
if (! config('websockets.statistics.perform_dns_lookup')) {

src/Facades/StatisticsLogger.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
use BeyondCode\LaravelWebSockets\Statistics\Logger\StatisticsLogger as StatisticsLoggerInterface;
66
use Illuminate\Support\Facades\Facade;
77

8-
/** @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger */
8+
/**
9+
* @see \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
10+
* @mixin \BeyondCode\LaravelWebSockets\Statistics\Logger\HttpStatisticsLogger
11+
*/
912
class StatisticsLogger extends Facade
1013
{
1114
protected static function getFacadeAccessor()

src/Facades/WebSocketsRouter.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
use Illuminate\Support\Facades\Facade;
66

7-
/** @see \BeyondCode\LaravelWebSockets\Server\Router */
7+
/**
8+
* @see \BeyondCode\LaravelWebSockets\Server\Router
9+
* @mixin \BeyondCode\LaravelWebSockets\Server\Router
10+
*/
811
class WebSocketsRouter extends Facade
912
{
1013
protected static function getFacadeAccessor()

src/HttpApi/Controllers/Controller.php

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Pusher\Pusher;
1717
use Ratchet\ConnectionInterface;
1818
use Ratchet\Http\HttpServerInterface;
19+
use React\Promise\PromiseInterface;
1920
use Symfony\Bridge\PsrHttpMessage\Factory\HttpFoundationFactory;
2021
use Symfony\Component\HttpKernel\Exception\HttpException;
2122

@@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface
3031
/** @var int */
3132
protected $contentLength;
3233

33-
/** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */
34+
/** @var ChannelManager */
3435
protected $channelManager;
3536

3637
public function __construct(ChannelManager $channelManager)
@@ -46,7 +47,11 @@ public function onOpen(ConnectionInterface $connection, RequestInterface $reques
4647

4748
$this->requestBuffer = (string) $request->getBody();
4849

49-
$this->checkContentLength($connection);
50+
if (! $this->verifyContentLength()) {
51+
return;
52+
}
53+
54+
$this->handleRequest($connection);
5055
}
5156

5257
protected function findContentLength(array $headers): int
@@ -60,31 +65,53 @@ public function onMessage(ConnectionInterface $from, $msg)
6065
{
6166
$this->requestBuffer .= $msg;
6267

63-
$this->checkContentLength($from);
68+
if (! $this->verifyContentLength()) {
69+
return;
70+
}
71+
72+
$this->handleRequest($from);
73+
}
74+
75+
protected function verifyContentLength()
76+
{
77+
return strlen($this->requestBuffer) === $this->contentLength;
6478
}
6579

66-
protected function checkContentLength(ConnectionInterface $connection)
80+
protected function handleRequest(ConnectionInterface $connection)
6781
{
68-
if (strlen($this->requestBuffer) === $this->contentLength) {
69-
$serverRequest = (new ServerRequest(
70-
$this->request->getMethod(),
71-
$this->request->getUri(),
72-
$this->request->getHeaders(),
73-
$this->requestBuffer,
74-
$this->request->getProtocolVersion()
75-
))->withQueryParams(QueryParameters::create($this->request)->all());
82+
$serverRequest = (new ServerRequest(
83+
$this->request->getMethod(),
84+
$this->request->getUri(),
85+
$this->request->getHeaders(),
86+
$this->requestBuffer,
87+
$this->request->getProtocolVersion()
88+
))->withQueryParams(QueryParameters::create($this->request)->all());
89+
90+
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
7691

77-
$laravelRequest = Request::createFromBase((new HttpFoundationFactory)->createRequest($serverRequest));
92+
$this
93+
->ensureValidAppId($laravelRequest->appId)
94+
->ensureValidSignature($laravelRequest);
7895

79-
$this
80-
->ensureValidAppId($laravelRequest->appId)
81-
->ensureValidSignature($laravelRequest);
96+
// Invoke the controller action
97+
$response = $this($laravelRequest);
8298

83-
$response = $this($laravelRequest);
99+
// Allow for async IO in the controller action
100+
if ($response instanceof PromiseInterface) {
101+
$response->then(function ($response) use ($connection) {
102+
$this->sendAndClose($connection, $response);
103+
});
84104

85-
$connection->send(JsonResponse::create($response));
86-
$connection->close();
105+
return;
87106
}
107+
108+
$this->sendAndClose($connection, $response);
109+
}
110+
111+
protected function sendAndClose(ConnectionInterface $connection, $response)
112+
{
113+
$connection->send(JsonResponse::create($response));
114+
$connection->close();
88115
}
89116

90117
public function onClose(ConnectionInterface $connection)
@@ -122,7 +149,7 @@ protected function ensureValidSignature(Request $request)
122149
/*
123150
* The `auth_signature` & `body_md5` parameters are not included when calculating the `auth_signature` value.
124151
*
125-
* The `appId`, `appKey` & `channelName` parameters are actually route paramaters and are never supplied by the client.
152+
* The `appId`, `appKey` & `channelName` parameters are actually route parameters and are never supplied by the client.
126153
*/
127154
$params = Arr::except($request->query(), ['auth_signature', 'body_md5', 'appId', 'appKey', 'channelName']);
128155

src/HttpApi/Controllers/FetchChannelController.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public function __invoke(Request $request)
1515
throw new HttpException(404, "Unknown channel `{$request->channelName}`.");
1616
}
1717

18-
return $channel->toArray();
18+
return $channel->toArray($request->appId);
1919
}
2020
}

src/HttpApi/Controllers/FetchChannelsController.php

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,22 @@
66
use Illuminate\Support\Collection;
77
use Illuminate\Support\Str;
88
use Symfony\Component\HttpKernel\Exception\HttpException;
9+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
10+
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
11+
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
912

1013
class FetchChannelsController extends Controller
1114
{
15+
/** @var ReplicationInterface */
16+
protected $replication;
17+
18+
public function __construct(ChannelManager $channelManager, ReplicationInterface $replication)
19+
{
20+
parent::__construct($channelManager);
21+
22+
$this->replication = $replication;
23+
}
24+
1225
public function __invoke(Request $request)
1326
{
1427
$attributes = [];
@@ -29,15 +42,28 @@ public function __invoke(Request $request)
2942
});
3043
}
3144

32-
return [
33-
'channels' => $channels->map(function ($channel) use ($attributes) {
34-
$info = new \stdClass;
35-
if (in_array('user_count', $attributes)) {
36-
$info->user_count = count($channel->getUsers());
37-
}
45+
// We want to get the channel user count all in one shot when
46+
// using a replication backend rather than doing individual queries.
47+
// To do so, we first collect the list of channel names.
48+
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
49+
return $channel->getChannelName();
50+
})->toArray();
51+
52+
// We ask the replication backend to get us the member count per channel.
53+
// We get $counts back as a key-value array of channel names and their member count.
54+
return $this->replication
55+
->channelMemberCounts($request->appId, $channelNames)
56+
->then(function (array $counts) use ($channels, $attributes) {
57+
return [
58+
'channels' => $channels->map(function (PresenceChannel $channel) use ($counts, $attributes) {
59+
$info = new \stdClass;
60+
if (in_array('user_count', $attributes)) {
61+
$info->user_count = $counts[$channel->getChannelName()];
62+
}
3863

39-
return $info;
40-
})->toArray() ?: new \stdClass,
41-
];
64+
return $info;
65+
})->toArray() ?: new \stdClass,
66+
];
67+
});
4268
}
4369
}

src/HttpApi/Controllers/FetchUsersController.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ public function __invoke(Request $request)
2121
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
2222
}
2323

24-
return [
25-
'users' => Collection::make($channel->getUsers())->map(function ($user) {
26-
return ['id' => $user->user_id];
27-
})->values(),
28-
];
24+
return $channel
25+
->getUsers($request->appId)
26+
->then(function (array $users) {
27+
return [
28+
'users' => Collection::make($users)->map(function ($user) {
29+
return ['id' => $user->user_id];
30+
})->values(),
31+
];
32+
});
2933
}
3034
}

src/HttpApi/Controllers/TriggerEventController.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public function __invoke(Request $request)
1919
'channel' => $channelName,
2020
'event' => $request->json()->get('name'),
2121
'data' => $request->json()->get('data'),
22-
], $request->json()->get('socket_id'));
22+
], $request->json()->get('socket_id'), $request->appId);
2323

2424
DashboardLogger::apiMessage(
2525
$request->appId,

0 commit comments

Comments
 (0)