Skip to content

Commit d354218

Browse files
committed
feat: Removes the pg_triggers extension to just leave pgt_outbox
1 parent ba0ba80 commit d354218

File tree

1 file changed

+13
-340
lines changed

1 file changed

+13
-340
lines changed

lib/sequel/extensions/pg_triggers.rb

Lines changed: 13 additions & 340 deletions
Original file line numberDiff line numberDiff line change
@@ -1,335 +1,14 @@
11
# frozen_string_literal: true
2-
# The pg_triggers extension adds support to the Database instance for easily
3-
# creating triggers and trigger returning functions for common needs.
42

3+
# The pgt_outbox extension adds support to the Database instance for
4+
# implementing the transactional outbox pattern using triggers.
5+
6+
# Top-level Sequel namespace
57
module Sequel
8+
# Postgres namespace
69
module Postgres
7-
PGT_DEFINE = proc do
8-
def pgt_counter_cache(main_table, main_table_id_column, counter_column, counted_table, counted_table_id_column,
9-
opts = {})
10-
trigger_name = opts[:trigger_name] || "pgt_cc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{counter_column}__#{counted_table_id_column}"
11-
function_name = opts[:function_name] || "pgt_cc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{counter_column}__#{pgt_mangled_table_name(counted_table)}__#{counted_table_id_column}"
12-
13-
table = quote_schema_table(main_table)
14-
id_column = quote_identifier(counted_table_id_column)
15-
main_column = quote_identifier(main_table_id_column)
16-
count_column = quote_identifier(counter_column)
17-
18-
pgt_trigger(counted_table, trigger_name, function_name, %i[insert update delete], <<-SQL, :after => true)
19-
BEGIN
20-
#{pgt_pg_trigger_depth_guard_clause(opts)}
21-
IF (TG_OP = 'UPDATE' AND (NEW.#{id_column} = OLD.#{id_column} OR (OLD.#{id_column} IS NULL AND NEW.#{id_column} IS NULL))) THEN
22-
RETURN NEW;
23-
ELSE
24-
IF ((TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.#{id_column} IS NOT NULL) THEN
25-
UPDATE #{table} SET #{count_column} = #{count_column} + 1 WHERE #{main_column} = NEW.#{id_column};
26-
END IF;
27-
IF ((TG_OP = 'DELETE' OR TG_OP = 'UPDATE') AND OLD.#{id_column} IS NOT NULL) THEN
28-
UPDATE #{table} SET #{count_column} = #{count_column} - 1 WHERE #{main_column} = OLD.#{id_column};
29-
END IF;
30-
END IF;
31-
32-
IF (TG_OP = 'DELETE') THEN
33-
RETURN OLD;
34-
END IF;
35-
RETURN NEW;
36-
END;
37-
SQL
38-
end
39-
40-
def pgt_created_at(table, column, opts = {})
41-
trigger_name = opts[:trigger_name] || "pgt_ca_#{column}"
42-
function_name = opts[:function_name] || "pgt_ca_#{pgt_mangled_table_name(table)}__#{column}"
43-
col = quote_identifier(column)
44-
pgt_trigger(table, trigger_name, function_name, %i[insert update], <<-SQL)
45-
BEGIN
46-
IF (TG_OP = 'UPDATE') THEN
47-
NEW.#{col} := OLD.#{col};
48-
ELSIF (TG_OP = 'INSERT') THEN
49-
NEW.#{col} := CURRENT_TIMESTAMP;
50-
END IF;
51-
RETURN NEW;
52-
END;
53-
SQL
54-
end
55-
56-
def pgt_force_defaults(table, defaults, opts = {})
57-
cols = defaults.keys.sort.join('_')
58-
trigger_name = opts[:trigger_name] || "pgt_fd_#{cols}"
59-
function_name = opts[:function_name] || "pgt_fd_#{pgt_mangled_table_name(table)}__#{cols}"
60-
lines = defaults.map do |column, v|
61-
"NEW.#{quote_identifier(column)} = #{literal(v)};"
62-
end
63-
pgt_trigger(table, trigger_name, function_name, [:insert], <<-SQL)
64-
BEGIN
65-
#{lines.join("\n")}
66-
RETURN NEW;
67-
END;
68-
SQL
69-
end
70-
71-
def pgt_immutable(table, *columns)
72-
opts = columns.last.is_a?(Hash) ? columns.pop : {}
73-
trigger_name = opts[:trigger_name] || "pgt_im_#{columns.join("__")}"
74-
function_name = opts[:function_name] || "pgt_im_#{columns.join("__")}"
75-
ifs = columns.map do |c|
76-
old = "OLD.#{quote_identifier(c)}"
77-
new = "NEW.#{quote_identifier(c)}"
78-
<<-END
79-
IF #{new} IS DISTINCT FROM #{old} THEN
80-
RAISE EXCEPTION 'Attempted #{c} update: Old: %, New: %', #{old}, #{new};
81-
END IF;
82-
END
83-
end.join("\n")
84-
pgt_trigger(table, trigger_name, function_name, :update, "BEGIN #{ifs} RETURN NEW; END;")
85-
end
86-
87-
def pgt_json_audit_log_setup(table, opts = {})
88-
function_name = opts[:function_name] || "pgt_jal_#{pgt_mangled_table_name(table)}"
89-
create_table(table) do
90-
Bignum :txid, :null => false, :index => true
91-
DateTime :at, :default => Sequel::CURRENT_TIMESTAMP, :null => false
92-
String :user, :null => false
93-
String :schema, :null => false
94-
String :table, :null => false
95-
String :action, :null => false
96-
jsonb :prior, :null => false
97-
end
98-
create_function(function_name, <<-SQL,
99-
{ :language => :plpgsql, :returns => :trigger, :replace => true }.merge(opts[:function_opts] || {}))
100-
BEGIN
101-
#{pgt_pg_trigger_depth_guard_clause(opts)}
102-
INSERT INTO #{quote_schema_table(table)} (txid, at, "user", "schema", "table", action, prior) VALUES
103-
(txid_current(), CURRENT_TIMESTAMP, CURRENT_USER, TG_TABLE_SCHEMA, TG_TABLE_NAME, TG_OP, to_jsonb(OLD));
104-
IF (TG_OP = 'DELETE') THEN
105-
RETURN OLD;
106-
END IF;
107-
RETURN NEW;
108-
END;
109-
SQL
110-
function_name
111-
end
112-
113-
def pgt_json_audit_log(table, function, opts = {})
114-
create_trigger(table, (opts[:trigger_name] || "pgt_jal_#{pgt_mangled_table_name(table)}"), function,
115-
:events => %i[update delete], :each_row => true, :after => true)
116-
end
117-
118-
def pgt_sum_cache(main_table, main_table_id_column, sum_column, summed_table, summed_table_id_column,
119-
summed_column, opts = {})
120-
trigger_name = opts[:trigger_name] || "pgt_sc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{summed_table_id_column}"
121-
function_name = opts[:function_name] || "pgt_sc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{pgt_mangled_table_name(summed_table)}__#{summed_table_id_column}__#{summed_column}"
122-
123-
table = quote_schema_table(main_table)
124-
id_column = quote_identifier(summed_table_id_column)
125-
126-
new_table_summed_column = literal(Sequel.deep_qualify(Sequel.lit('NEW'), summed_column))
127-
old_table_summed_column = literal(Sequel.deep_qualify(Sequel.lit('OLD'), summed_column))
128-
main_column = quote_identifier(main_table_id_column)
129-
sum_column = quote_identifier(sum_column)
130-
131-
pgt_trigger(summed_table, trigger_name, function_name, %i[insert delete update], <<-SQL, :after => true)
132-
BEGIN
133-
#{pgt_pg_trigger_depth_guard_clause(opts)}
134-
IF (TG_OP = 'UPDATE' AND NEW.#{id_column} = OLD.#{id_column}) THEN
135-
UPDATE #{table} SET #{sum_column} = #{sum_column} + #{new_table_summed_column} - #{old_table_summed_column} WHERE #{main_column} = NEW.#{id_column};
136-
ELSE
137-
IF ((TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.#{id_column} IS NOT NULL) THEN
138-
UPDATE #{table} SET #{sum_column} = #{sum_column} + #{new_table_summed_column} WHERE #{main_column} = NEW.#{id_column};
139-
END IF;
140-
IF ((TG_OP = 'DELETE' OR TG_OP = 'UPDATE') AND OLD.#{id_column} IS NOT NULL) THEN
141-
UPDATE #{table} SET #{sum_column} = #{sum_column} - #{old_table_summed_column} WHERE #{main_column} = OLD.#{id_column};
142-
END IF;
143-
END IF;
144-
IF (TG_OP = 'DELETE') THEN
145-
RETURN OLD;
146-
END IF;
147-
RETURN NEW;
148-
END;
149-
SQL
150-
end
151-
152-
def pgt_sum_through_many_cache(opts = {})
153-
main_table = opts.fetch(:main_table)
154-
main_table_id_column = opts.fetch(:main_table_id_column, :id)
155-
sum_column = opts.fetch(:sum_column)
156-
summed_table = opts.fetch(:summed_table)
157-
summed_table_id_column = opts.fetch(:summed_table_id_column, :id)
158-
summed_column = opts.fetch(:summed_column)
159-
join_table = opts.fetch(:join_table)
160-
main_table_fk_column = opts.fetch(:main_table_fk_column)
161-
summed_table_fk_column = opts.fetch(:summed_table_fk_column)
162-
163-
summed_column_slug = summed_column.is_a?(String) || summed_column.is_a?(Symbol) ? "__#{summed_column}" : ''
164-
trigger_name = opts[:trigger_name] || "pgt_stmc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{summed_table_id_column}#{summed_column_slug}__#{main_table_fk_column}__#{summed_table_fk_column}"
165-
function_name = opts[:function_name] || "pgt_stmc_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{pgt_mangled_table_name(summed_table)}__#{summed_table_id_column}#{summed_column_slug}__#{pgt_mangled_table_name(join_table)}__#{main_table_fk_column}__#{summed_table_fk_column}"
166-
join_trigger_name = opts[:join_trigger_name] || "pgt_stmc_join_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{summed_table_id_column}#{summed_column_slug}__#{main_table_fk_column}__#{summed_table_fk_column}"
167-
join_function_name = opts[:join_function_name] || "pgt_stmc_join_#{pgt_mangled_table_name(main_table)}__#{main_table_id_column}__#{sum_column}__#{pgt_mangled_table_name(summed_table)}__#{summed_table_id_column}#{summed_column_slug}__#{pgt_mangled_table_name(join_table)}__#{main_table_fk_column}__#{summed_table_fk_column}"
168-
169-
orig_summed_table = summed_table
170-
orig_join_table = join_table
171-
172-
main_table = quote_schema_table(main_table)
173-
main_table_id_column = quote_schema_table(main_table_id_column)
174-
sum_column = quote_schema_table(sum_column)
175-
176-
general_summed_column = literal(Sequel.deep_qualify(summed_table, summed_column))
177-
new_table_summed_column = literal(Sequel.deep_qualify(Sequel.lit('NEW'), summed_column))
178-
old_table_summed_column = literal(Sequel.deep_qualify(Sequel.lit('OLD'), summed_column))
179-
180-
summed_table = quote_schema_table(summed_table)
181-
summed_table_id_column = quote_schema_table(summed_table_id_column)
182-
join_table = quote_schema_table(join_table)
183-
main_table_fk_column = quote_schema_table(main_table_fk_column)
184-
summed_table_fk_column = quote_schema_table(summed_table_fk_column)
185-
186-
pgt_trigger(orig_summed_table, trigger_name, function_name, %i[insert delete update], <<-SQL, :after => true)
187-
BEGIN
188-
#{pgt_pg_trigger_depth_guard_clause(opts)}
189-
IF (TG_OP = 'UPDATE' AND NEW.#{summed_table_id_column} = OLD.#{summed_table_id_column}) THEN
190-
UPDATE #{main_table} SET #{sum_column} = #{sum_column} + #{new_table_summed_column} - #{old_table_summed_column} WHERE #{main_table_id_column} IN (SELECT #{main_table_fk_column} FROM #{join_table} WHERE #{summed_table_fk_column} = NEW.#{summed_table_id_column});
191-
ELSE
192-
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
193-
UPDATE #{main_table} SET #{sum_column} = #{sum_column} + #{new_table_summed_column} WHERE #{main_table_id_column} IN (SELECT #{main_table_fk_column} FROM #{join_table} WHERE #{summed_table_fk_column} = NEW.#{summed_table_id_column});
194-
END IF;
195-
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
196-
UPDATE #{main_table} SET #{sum_column} = #{sum_column} - #{old_table_summed_column} WHERE #{main_table_id_column} IN (SELECT #{main_table_fk_column} FROM #{join_table} WHERE #{summed_table_fk_column} = OLD.#{summed_table_id_column});
197-
END IF;
198-
END IF;
199-
IF (TG_OP = 'DELETE') THEN
200-
RETURN OLD;
201-
END IF;
202-
RETURN NEW;
203-
END;
204-
SQL
205-
206-
pgt_trigger(orig_join_table, join_trigger_name, join_function_name, %i[insert delete update],
207-
<<-SQL, :after => true)
208-
BEGIN
209-
#{pgt_pg_trigger_depth_guard_clause(opts)}
210-
IF (NOT (TG_OP = 'UPDATE' AND NEW.#{main_table_fk_column} = OLD.#{main_table_fk_column} AND NEW.#{summed_table_fk_column} = OLD.#{summed_table_fk_column})) THEN
211-
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
212-
UPDATE #{main_table} SET #{sum_column} = #{sum_column} + (SELECT #{general_summed_column} FROM #{summed_table} WHERE #{summed_table_id_column} = NEW.#{summed_table_fk_column}) WHERE #{main_table_id_column} = NEW.#{main_table_fk_column};
213-
END IF;
214-
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
215-
UPDATE #{main_table} SET #{sum_column} = #{sum_column} - (SELECT #{general_summed_column} FROM #{summed_table} WHERE #{summed_table_id_column} = OLD.#{summed_table_fk_column}) WHERE #{main_table_id_column} = OLD.#{main_table_fk_column};
216-
END IF;
217-
END IF;
218-
IF (TG_OP = 'DELETE') THEN
219-
RETURN OLD;
220-
END IF;
221-
RETURN NEW;
222-
END;
223-
SQL
224-
end
225-
226-
def pgt_touch(main_table, touch_table, column, expr, opts = {})
227-
trigger_name = opts[:trigger_name] || "pgt_t_#{pgt_mangled_table_name(main_table)}__#{pgt_mangled_table_name(touch_table)}"
228-
function_name = opts[:function_name] || "pgt_t_#{pgt_mangled_table_name(main_table)}__#{pgt_mangled_table_name(touch_table)}"
229-
cond = lambda {|source| expr.map {|k, v|
230-
"#{quote_identifier(k)} = #{source}.#{quote_identifier(v)}"}.join(' AND ')}
231-
same_id = expr.map {|_k, v| "NEW.#{quote_identifier(v)} = OLD.#{quote_identifier(v)}"}.join(' AND ')
232-
233-
table = quote_schema_table(touch_table)
234-
col = quote_identifier(column)
235-
update = lambda {
236-
|source| " UPDATE #{table} SET #{col} = CURRENT_TIMESTAMP WHERE #{cond[source]} AND ((#{col} <> CURRENT_TIMESTAMP) OR (#{col} IS NULL));"}
237-
238-
sql = <<-SQL
239-
BEGIN
240-
#{pgt_pg_trigger_depth_guard_clause(opts)}
241-
IF (TG_OP = 'UPDATE' AND (#{same_id})) THEN
242-
#{update["NEW"]}
243-
ELSE
244-
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
245-
#{update["NEW"]}
246-
END IF;
247-
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
248-
#{update["OLD"]}
249-
END IF;
250-
END IF;
251-
252-
IF (TG_OP = 'DELETE') THEN
253-
RETURN OLD;
254-
END IF;
255-
RETURN NEW;
256-
END;
257-
SQL
258-
pgt_trigger(main_table, trigger_name, function_name, %i[insert delete update], sql, :after => true)
259-
end
260-
261-
def pgt_updated_at(table, column, opts = {})
262-
trigger_name = opts[:trigger_name] || "pgt_ua_#{column}"
263-
function_name = opts[:function_name] || "pgt_ua_#{pgt_mangled_table_name(table)}__#{column}"
264-
pgt_trigger(table, trigger_name, function_name, %i[insert update], <<-SQL)
265-
BEGIN
266-
NEW.#{quote_identifier(column)} := CURRENT_TIMESTAMP;
267-
RETURN NEW;
268-
END;
269-
SQL
270-
end
271-
272-
def pgt_foreign_key_array(opts = {})
273-
table, column, rtable, rcolumn = opts.values_at(:table, :column, :referenced_table, :referenced_column)
274-
trigger_name = opts[:trigger_name] || "pgt_fka_#{column}"
275-
function_name = opts[:function_name] || "pgt_fka_#{pgt_mangled_table_name(table)}__#{column}"
276-
rtrigger_name = opts[:referenced_trigger_name] || "pgt_rfka_#{column}"
277-
rfunction_name = opts[:referenced_function_name] || "pgt_rfka_#{pgt_mangled_table_name(table)}__#{column}"
278-
col = quote_identifier(column)
279-
tab = quote_identifier(table)
280-
rcol = quote_identifier(rcolumn)
281-
rtab = quote_identifier(rtable)
282-
283-
pgt_trigger(table, trigger_name, function_name, %i[insert update], <<-SQL)
284-
DECLARE
285-
arr #{tab}.#{col}%TYPE;
286-
temp_count1 int;
287-
temp_count2 int;
288-
BEGIN
289-
arr := NEW.#{col};
290-
temp_count1 := array_ndims(arr);
291-
IF arr IS NULL OR temp_count1 IS NULL THEN
292-
RETURN NEW;
293-
END IF;
294-
295-
IF temp_count1 IS DISTINCT FROM 1 THEN
296-
RAISE EXCEPTION 'Foreign key array #{tab}.#{col} has more than 1 dimension: %, dimensions: %', arr, temp_count1;
297-
END IF;
298-
299-
SELECT count(*) INTO temp_count1 FROM unnest(arr);
300-
SELECT count(*) INTO temp_count2 FROM (SELECT DISTINCT * FROM unnest(arr)) AS t;
301-
IF temp_count1 IS DISTINCT FROM temp_count2 THEN
302-
RAISE EXCEPTION 'Duplicate entry in foreign key array #{tab}.#{col}: %', arr;
303-
END IF;
304-
305-
SELECT COUNT(*) INTO temp_count1 FROM #{rtab} WHERE #{rcol} = ANY(arr);
306-
temp_count2 := array_length(arr, 1);
307-
IF temp_count1 IS DISTINCT FROM temp_count2 THEN
308-
RAISE EXCEPTION 'Entry in foreign key array #{tab}.#{col} not in referenced column #{rtab}.#{rcol}: %', arr;
309-
END IF;
310-
311-
RETURN NEW;
312-
END;
313-
SQL
314-
315-
pgt_trigger(rtable, rtrigger_name, rfunction_name, %i[delete update], <<-SQL)
316-
DECLARE
317-
val #{rtab}.#{rcol}%TYPE;
318-
temp_count int;
319-
BEGIN
320-
val := OLD.#{rcol};
321-
IF (TG_OP = 'DELETE') OR val IS DISTINCT FROM NEW.#{rcol} THEN
322-
SELECT COUNT(*) INTO temp_count FROM #{tab} WHERE #{col} @> ARRAY[val];
323-
IF temp_count IS DISTINCT FROM 0 THEN
324-
RAISE EXCEPTION 'Entry in referenced column #{rtab}.#{rcol} still in foreign key array #{tab}.#{col}: %, count: %', val, temp_count;
325-
END IF;
326-
END IF;
327-
RETURN NEW;
328-
END;
329-
SQL
330-
end
331-
332-
def pgt_outbox_setup(table, opts = {})
10+
Rubyists::PgtOutbox.definition = proc do # rubocop:disable Metrics/BlockLength
11+
def pgt_outbox_setup(table, opts = {}) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
33312
function_name = opts.fetch(:function_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
33413
outbox_table = opts.fetch(:outbox_table, "#{table}_outbox")
33514
quoted_outbox = quote_schema_table(outbox_table)
@@ -397,20 +76,13 @@ def pgt_outbox_setup(table, opts = {})
39776

39877
def pgt_outbox_events(table, function, opts = {})
39978
events = opts.fetch(:events, %i[insert update delete])
79+
where = opts.fetch(:when, nil)
40080
trigger_name = opts.fetch(:trigger_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
401-
create_trigger(table, trigger_name, function, events: events, replace: true, each_row: true, after: true,
402-
when: opts[:when])
81+
create_trigger(table, trigger_name, function, events:, replace: true, each_row: true, after: true, when: where)
40382
end
40483

40584
private
40685

407-
# Add or replace a function that returns trigger to handle the action,
408-
# and add a trigger that calls the function.
409-
def pgt_trigger(table, trigger_name, function_name, events, definition, opts = {})
410-
create_function(function_name, definition, :language => :plpgsql, :returns => :trigger, :replace => true)
411-
create_trigger(table, trigger_name, function_name, :events => events, :each_row => true, :after => opts[:after])
412-
end
413-
41486
# Mangle the schema name so it can be used in an unquoted_identifier
41587
def pgt_mangled_table_name(table)
41688
quote_schema_table(table).gsub('"', '').gsub(/[^A-Za-z0-9]/, '_').gsub(/_+/, '_')
@@ -431,10 +103,11 @@ def pgt_pg_trigger_depth_guard_clause(opts)
431103
end
432104
end
433105

434-
module PGTMethods
435-
class_eval(&PGT_DEFINE)
106+
# The PgtOutboxMethods module provides methods for creating outbox tables and triggers
107+
module PgtOutBoxMethods
108+
class_eval(&Rubyists::PgtOutbox.definition)
436109
end
437110
end
438111

439-
Database.register_extension(:pg_triggers, Postgres::PGTMethods)
112+
Database.register_extension(:pgt_outbox, Postgres::PgtOutboxMethods)
440113
end

0 commit comments

Comments
 (0)