Skip to content

Commit d990017

Browse files
authored
Add query timeout option on ReplicationConnection (#748)
1 parent 412b555 commit d990017

File tree

3 files changed

+72
-24
lines changed

3 files changed

+72
-24
lines changed

lib/postgrex/protocol.ex

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,36 +1153,37 @@ defmodule Postgrex.Protocol do
11531153

11541154
## replication/notifications
11551155

1156-
@spec handle_simple(String.t() | iolist(), state) ::
1156+
@spec handle_simple(String.t() | iolist(), Keyword.t(), state) ::
11571157
{:ok, [Postgrex.Result.t()], state}
11581158
| {:error, Postgrex.Error.t(), state}
11591159
| {:disconnect, %DBConnection.ConnectionError{}, state}
11601160
def handle_simple(statement, opts \\ [], %{buffer: buffer} = s) do
11611161
status = new_status(opts, mode: :transaction)
11621162
msgs = [msg_query(statement: statement)]
1163+
timeout = Keyword.get(opts, :timeout, :infinity)
11631164

11641165
case msg_send(%{s | buffer: nil}, msgs, buffer) do
11651166
:ok ->
1166-
recv_simple(s, status, [], [], [], buffer)
1167+
recv_simple(s, status, [], [], [], buffer, timeout)
11671168

11681169
{:disconnect, err, s} ->
11691170
{:disconnect, err, s}
11701171
end
11711172
end
11721173

1173-
defp recv_simple(s, status, results, columns, rows, buffer) do
1174-
case msg_recv(s, :infinity, buffer) do
1174+
defp recv_simple(s, status, results, columns, rows, buffer, timeout) do
1175+
case msg_recv(s, timeout, buffer) do
11751176
{:ok, msg_row_desc(fields: fields), buffer} ->
11761177
columns = column_names(fields)
1177-
recv_simple(s, status, results, columns, rows, buffer)
1178+
recv_simple(s, status, results, columns, rows, buffer, timeout)
11781179

11791180
{:ok, msg_data_row(values: values), buffer} ->
11801181
row = Types.decode_simple(values, s.types)
1181-
recv_simple(s, status, results, columns, [row | rows], buffer)
1182+
recv_simple(s, status, results, columns, [row | rows], buffer, timeout)
11821183

11831184
{:ok, msg_command_complete(tag: tag), buffer} ->
11841185
{result, s} = done(s, columns, Enum.reverse(rows), [tag])
1185-
recv_simple(s, status, [result | results], [], [], buffer)
1186+
recv_simple(s, status, [result | results], [], [], buffer, timeout)
11861187

11871188
{:ok, msg_error(fields: fields), buffer} ->
11881189
err = Postgrex.Error.exception(postgres: fields)
@@ -1194,7 +1195,7 @@ defmodule Postgrex.Protocol do
11941195

11951196
{:ok, msg, buffer} ->
11961197
s = handle_msg(s, status, msg)
1197-
recv_simple(s, status, results, columns, rows, buffer)
1198+
recv_simple(s, status, results, columns, rows, buffer, timeout)
11981199

11991200
{:disconnect, _, _} = dis ->
12001201
dis

lib/postgrex/replication_connection.ex

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,17 @@ defmodule Postgrex.ReplicationConnection do
172172
173173
"""
174174
@type stream_opts :: [max_messages: pos_integer]
175+
176+
@query_timeout :infinity
177+
178+
@typedoc """
179+
The following options configure querying:
180+
181+
* `:timeout` - Query request timeout (default: `#{@query_timeout}`);
182+
183+
"""
184+
@type query_opts :: [timeout: timeout]
185+
175186
@max_lsn_component_size 8
176187
@max_uint64 18_446_744_073_709_551_615
177188
@max_messages 500
@@ -192,6 +203,7 @@ defmodule Postgrex.ReplicationConnection do
192203
{:noreply, state}
193204
| {:noreply, ack, state}
194205
| {:query, query, state}
206+
| {:query, query, query_opts, state}
195207
| {:stream, query, stream_opts, state}
196208
| {:disconnect, reason}
197209

@@ -220,6 +232,7 @@ defmodule Postgrex.ReplicationConnection do
220232
{:noreply, state}
221233
| {:noreply, ack, state}
222234
| {:query, query, state}
235+
| {:query, query, query_opts, state}
223236
| {:stream, query, stream_opts, state}
224237
| {:disconnect, reason}
225238

@@ -230,6 +243,7 @@ defmodule Postgrex.ReplicationConnection do
230243
{:noreply, state}
231244
| {:noreply, ack, state}
232245
| {:query, query, state}
246+
| {:query, query, query_opts, state}
233247
| {:stream, query, stream_opts, state}
234248
| {:disconnect, reason}
235249

@@ -249,17 +263,19 @@ defmodule Postgrex.ReplicationConnection do
249263
{:noreply, state}
250264
| {:noreply, ack, state}
251265
| {:query, query, state}
266+
| {:query, query, query_opts, state}
252267
| {:stream, query, stream_opts, state}
253268
| {:disconnect, reason}
254269

255270
@doc """
256271
Callback for `:query` outputs.
257272
258-
If any callback returns `{:query, iodata, state}`,
259-
then this callback will be immediately called with
260-
the result of the query. Please note that even though
261-
replication connections use the simple query protocol,
262-
Postgres currently limits them to single command queries.
273+
If any callback returns `{:query, iodata, state}` or
274+
`{:query, iodata, opts, state}`, then this callback will
275+
be immediately called with the result of the query.
276+
Please note that even though replication connections use
277+
the simple query protocol, Postgres currently limits them to
278+
single command queries.
263279
Due to this constraint, this callback will be passed
264280
either a list with a single successful query result or
265281
an error.
@@ -268,6 +284,7 @@ defmodule Postgrex.ReplicationConnection do
268284
{:noreply, state}
269285
| {:noreply, ack, state}
270286
| {:query, query, state}
287+
| {:query, query, query_opts, state}
271288
| {:stream, query, stream_opts, state}
272289
| {:disconnect, reason}
273290

@@ -586,25 +603,35 @@ defmodule Postgrex.ReplicationConnection do
586603
stream_in_progress(:stream, mod, mod_state, from, s)
587604

588605
{:query, query, mod_state} when streaming == nil ->
589-
case Protocol.handle_simple(query, [], s.protocol) do
590-
{:ok, results, protocol} when is_list(results) ->
591-
handle(mod, :handle_result, [results, mod_state], from, %{s | protocol: protocol})
592-
593-
{:error, %Postgrex.Error{} = error, protocol} ->
594-
handle(mod, :handle_result, [error, mod_state], from, %{s | protocol: protocol})
606+
handle_query(query, mod, from, s, mod_state, timeout: @query_timeout)
595607

596-
{:disconnect, reason, protocol} ->
597-
reconnect_or_stop(:disconnect, reason, protocol, %{s | state: {mod, mod_state}})
598-
end
608+
{:query, query, opts, mod_state} when streaming == nil ->
609+
handle_query(query, mod, from, s, mod_state, opts)
599610

600611
{:query, _query, mod_state} ->
601612
stream_in_progress(:query, mod, mod_state, from, s)
602613

614+
{:query, _query, _opts, mod_state} ->
615+
stream_in_progress(:query, mod, mod_state, from, s)
616+
603617
{:disconnect, reason} ->
604618
reconnect_or_stop(:disconnect, reason, s.protocol, s)
605619
end
606620
end
607621

622+
defp handle_query(query, mod, from, s, mod_state, opts) do
623+
case Protocol.handle_simple(query, opts, s.protocol) do
624+
{:ok, results, protocol} when is_list(results) ->
625+
handle(mod, :handle_result, [results, mod_state], from, %{s | protocol: protocol})
626+
627+
{:error, %Postgrex.Error{} = error, protocol} ->
628+
handle(mod, :handle_result, [error, mod_state], from, %{s | protocol: protocol})
629+
630+
{:disconnect, reason, protocol} ->
631+
reconnect_or_stop(:disconnect, reason, protocol, %{s | state: {mod, mod_state}})
632+
end
633+
end
634+
608635
defp stream_in_progress(command, mod, mod_state, from, s) do
609636
Logger.warning("received #{command} while stream is already in progress")
610637
from && reply(from, {__MODULE__, :stream_in_progress})

test/replication_connection_test.exs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,14 @@ defmodule ReplicationTest do
9393
{:noreply, pid}
9494
end
9595

96-
@impl true
9796
def handle_call({:query, query}, from, pid) do
9897
{:query, query, {from, pid}}
9998
end
10099

100+
def handle_call({:query, query, %{timeout: timeout}}, from, pid) do
101+
{:query, query, [timeout: timeout], {from, pid}}
102+
end
103+
101104
# This is part of the "stream_continuation" test and handles call that
102105
# triggers that chain of events.
103106
def handle_call(
@@ -108,7 +111,6 @@ defmodule ReplicationTest do
108111
{:query, query, Map.merge(opts, %{from: from, pid: pid})}
109112
end
110113

111-
@impl true
112114
def handle_call({:disconnect, reason}, _, _) do
113115
{:disconnect, reason}
114116
end
@@ -167,6 +169,24 @@ defmodule ReplicationTest do
167169
assert {:error, %Postgrex.Error{}} = PR.call(context.repl, {:query, "SELCT"})
168170
end
169171

172+
@tag :capture_log
173+
test "on timeout", context do
174+
Process.flag(:trap_exit, true)
175+
repl_ref = Process.monitor(context.repl)
176+
177+
{_pid, call_ref} =
178+
spawn_monitor(fn ->
179+
assert {:error, %Postgrex.Error{}} =
180+
PR.call(context.repl, {:query, "SELECT pg_sleep(0.1);", %{timeout: 10}})
181+
end)
182+
183+
assert_receive {:DOWN, ^call_ref, _, _,
184+
{%DBConnection.ConnectionError{message: "tcp async_recv: timeout"}, _}}
185+
186+
assert_receive {:DOWN, ^repl_ref, _, _,
187+
%DBConnection.ConnectionError{message: "tcp async_recv: timeout"}}
188+
end
189+
170190
@tag :capture_log
171191
test "on disconnect", context do
172192
Process.flag(:trap_exit, true)

0 commit comments

Comments
 (0)