-
Notifications
You must be signed in to change notification settings - Fork 659
Update with pub sub replication and Redis driver #61
Update with pub sub replication and Redis driver #61
Conversation
This references #6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick overview of what is going on in #6
|
||
use React\EventLoop\LoopInterface; | ||
|
||
interface PubSubInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add a new interface, so that we can implement other drivers such as zeroMQ or something similar. I don't need this functionality, but it will be asked for.
class RedisClient implements PubSubInterface | ||
{ | ||
|
||
const REDIS_KEY = ':websockets:replication:'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make this configurable. Not sure how to keep this uncluttered in the config however.
public function __construct() | ||
{ | ||
$this->apps = collect(config('websockets.apps')); | ||
$this->serverId = Str::uuid()->toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set a unique server id, so that we don't get double messages on the current server.
public function subscribe(LoopInterface $loop): PubSubInterface | ||
{ | ||
$this->loop = $loop; | ||
[$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may wonder why we need both a publish connection and a subscribe connection.
clue/reactphp-redis#39 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest I'm more wondering why you are blocking here. Is this bit ran inside the websocket server, or somewhere outside it in blocking PHP land?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't await, you will end up returning a promise. Which isn't okay if you want the publish client to end up in the container. So what we do here is block until the promises resolve on the loop, and then push the actual instances on the variables instead of promises. Hopefully that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how are you restarting the loop? Or is this at the point where the loop hasn't been started yet by the server? Since await*
functions stop the loop once the promises resolves/rejects.
Another option would be to do something like https://github.com/PHP-DI-Definitions/clue-redis-client/blob/master/src/WaitingClient.php (would pull out that client into another package if you wish to go that road) and fake the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is going to be slightly confusing, because I don't fully understand Ratchet. But my understanding is that there is an event loop that is always running that you can push processes onto. The loop is started from the console command here:
$this->loop = LoopFactory::create(); |
You don't need to restart the loop. Just block for a split second to get the redis connection promise to resolve, and then the loop continues.
Since this subscribe command only runs once, it will just connect when you boot up the websocket server, and then continue along in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting client stand alone package is up at https://packagist.org/packages/wyrihaximus/react-redis-waiting-client I'll be adding integration tests for subscribing to that over the next few days
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, because it's clear the blocking approach was taken from my comment here: simonhamp/laravel-echo-ratchet-server#2 (comment), the reason I did it that way initially I think is just to lower the startup time by doing both connections at once. It's not strictly necessary. And like @snellingio said, it only ever happens right before the server is started, so there's no concern of the loop stopping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting client stand alone package is up at packagist.org/packages/wyrihaximus/react-redis-waiting-client I'll be adding integration tests for subscribing to that over the next few days
A better alternative for that package is up: clue/reactphp-redis#82
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lazy client release is out. That should make some thing simpler: https://github.com/clue/reactphp-redis/releases/tag/v2.3.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @WyriHaximus
I'll try to pick up this PR in a bit, might make a new PR based on this one to finish it up.
); | ||
} | ||
|
||
protected function getConnectionUri() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a test (and maybe a fallback), but it just pulls from the laravel app's config.
"clue/buzz-react": "^2.5", | ||
"clue/redis-react": "^2.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we can't use the default redis driver. We could probably use it for publishing, but not for subscribing. Reason is that it is blocking :(
@@ -112,6 +112,20 @@ | |||
'passphrase' => null, | |||
], | |||
|
|||
/* | |||
* You can enable replication to publish and subscribe to messages across the driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to keep the config light, but this is something we should probably expand on.
|
||
protected function configurePubSubReplication() | ||
{ | ||
if (config('websockets.replication.enabled') !== true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably allow for cli integration here as well.
} | ||
|
||
app()->singleton(PubSubInterface::class, function () use ($connection) { | ||
return $connection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can break if the driver isn't set to redis and replication is enabled. Maybe we should throw an exception if the connection isn't set.
} | ||
|
||
public function broadcastToEveryoneExcept($payload, ?string $socketId = null) | ||
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I don't love adding the appId here, but I didn't see another easy way to get it into the class.
Oh, and since this is my PR, my biggest feature request for this is figuring out how to use the cli logger in the class. I'd like to have a lot more logging in the class. We can easily do this with the |
I will not be able to pick this back up until after the new year. While I'll be around to communicate, I just won't have the time to advance this pull request during the holidays. Hope everyone has a great holiday. |
@snellingio any chance you'll be picking this back up soon? I can probably give you a hand in finishing this PR soon, I'm working on a new project which will be using this lib. |
@francislavoie I'd love some help finishing the PR. Especially around tests for this change. |
Closing in favor of #140 by @francislavoie |
Still needing tests, and handling edge cases such as statistics, etc.