Skip to content

Commit 1676436

Browse files
committed
Check dolphin leadership after upsert query
MySQL always returns the number of entries attempted, even when nothing was added. The previous match caused all nodes to believe they were the leader. This uses a secondary query within the same transaction to detect if the current instance is the leader. Closes #1250
1 parent 56be721 commit 1676436

File tree

2 files changed

+73
-30
lines changed

2 files changed

+73
-30
lines changed

lib/oban/peers/database.ex

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ defmodule Oban.Peers.Database do
102102
fun = fn ->
103103
state
104104
|> delete_expired_peers()
105-
|> upsert_peer()
105+
|> attempt_leadership()
106106
end
107107

108108
case Repo.transaction(state.conf, fun, retry: 1) do
@@ -163,14 +163,7 @@ defmodule Oban.Peers.Database do
163163
end
164164

165165
def handle_call(:get_leader, _from, %State{conf: conf} = state) do
166-
query =
167-
"oban_peers"
168-
|> where([p], p.name == ^inspect(conf.name))
169-
|> select([p], p.node)
170-
171-
leader = Repo.one(conf, query)
172-
173-
{:reply, leader, state}
166+
{:reply, query_leader(conf), state}
174167
end
175168

176169
# Helpers
@@ -199,11 +192,16 @@ defmodule Oban.Peers.Database do
199192
Repo.delete_all(conf, query)
200193
end
201194

202-
defp notify_down(%State{conf: conf}) do
203-
Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
195+
defp query_leader(conf) do
196+
query =
197+
"oban_peers"
198+
|> where([p], p.name == ^inspect(conf.name))
199+
|> select([p], p.node)
200+
201+
Repo.one(conf, query)
204202
end
205203

206-
defp upsert_peer(%State{conf: conf} = state) do
204+
defp attempt_leadership(%State{conf: conf} = state) do
207205
started_at = DateTime.utc_now()
208206
expires_at = DateTime.add(started_at, state.interval, :millisecond)
209207

@@ -214,24 +212,46 @@ defmodule Oban.Peers.Database do
214212
expires_at: expires_at
215213
}
216214

217-
# MySQL only supports auto-inference and not conflict_target.
218-
base_opts =
215+
leader? =
219216
if state.conf.engine == Oban.Engines.Dolphin do
220-
[]
217+
dolphin_insert(peer_data, state)
221218
else
222-
[conflict_target: :name]
219+
regular_upsert(peer_data, state)
223220
end
224221

222+
%{state | leader?: leader?}
223+
end
224+
225+
defp regular_upsert(peer_data, state) do
225226
repo_opts =
226227
if state.leader? do
227-
Keyword.put(base_opts, :on_conflict, set: [expires_at: expires_at])
228+
[conflict_target: :name, on_conflict: [set: [expires_at: peer_data.expires_at]]]
228229
else
229-
[on_conflict: :nothing]
230+
[conflict_target: :name, on_conflict: :nothing]
230231
end
231232

232-
case Repo.insert_all(conf, "oban_peers", [peer_data], repo_opts) do
233-
{0, nil} -> %{state | leader?: false}
234-
{_, nil} -> %{state | leader?: true}
233+
case Repo.insert_all(state.conf, "oban_peers", [peer_data], repo_opts) do
234+
{0, nil} -> false
235+
{_, nil} -> true
235236
end
236237
end
238+
239+
defp dolphin_insert(peer_data, state) do
240+
repo_opts =
241+
if state.leader? do
242+
[on_conflict: [set: [expires_at: peer_data.expires_at]]]
243+
else
244+
[on_conflict: :nothing]
245+
end
246+
247+
# MySQL always returns the number of entries attempted, even when nothing was added. We have
248+
# to check the leader after update.
249+
Repo.insert_all(state.conf, "oban_peers", [peer_data], repo_opts)
250+
251+
query_leader(state.conf) == peer_data.node
252+
end
253+
254+
defp notify_down(%State{conf: conf}) do
255+
Notifier.notify(conf, :leader, %{down: inspect(conf.name)})
256+
end
237257
end

test/oban/peers/database_test.exs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,46 @@ defmodule Oban.Peers.DatabaseTest do
66
alias Oban.Peer
77
alias Oban.Peers.Database
88
alias Oban.TelemetryHandler
9+
alias Oban.Test.DolphinRepo
910

10-
test "only a single peer is leader" do
11-
TelemetryHandler.attach_events()
12-
11+
test "enforcing a single leader with the Basic engine" do
1312
name = start_supervised_oban!(peer: false)
14-
conf = %{Oban.config(name) | peer: {Database, []}}
13+
conf = Oban.config(name)
14+
15+
assert [_leader] =
16+
[A, B, C]
17+
|> Enum.map(&start_supervised_peer!(conf, &1))
18+
|> Enum.filter(&Database.leader?/1)
19+
end
20+
21+
@tag :dolphin
22+
test "enforcing a single leader with the Dolphin engine" do
23+
name = start_supervised_oban!(engine: Oban.Engines.Dolphin, peer: false, repo: DolphinRepo)
24+
conf = Oban.config(name)
1525

1626
assert [_leader] =
1727
[A, B, C]
18-
|> Enum.map(&start_supervised!({Peer, conf: conf, name: &1}))
28+
|> Enum.map(&start_supervised_peer!(conf, &1))
1929
|> Enum.filter(&Database.leader?/1)
30+
end
31+
32+
defp start_supervised_peer!(conf, name) do
33+
node = "web.#{name}"
34+
conf = %{conf | node: node, peer: {Database, []}}
35+
36+
start_supervised!({Peer, conf: conf, name: name})
37+
end
38+
39+
test "dispatching leadership election events" do
40+
TelemetryHandler.attach_events()
41+
42+
start_supervised_oban!(peer: Database)
2043

21-
assert_received {:event, [:election, :start], _measure,
22-
%{leader: _, peer: Database, was_leader: nil}}
44+
assert_receive {:event, [:election, :start], _measure,
45+
%{leader: false, peer: Database, was_leader: nil}}
2346

24-
assert_received {:event, [:election, :stop], _measure,
25-
%{leader: _, peer: Database, was_leader: false}}
47+
assert_receive {:event, [:election, :stop], _measure,
48+
%{leader: true, peer: Database, was_leader: false}}
2649
end
2750

2851
test "gracefully handling a missing oban_peers table" do

0 commit comments

Comments
 (0)