Skip to content

Commit 738551b

Browse files
author
Antonin Houska
committed
Fixed processing of the rewrite.max_xlock_time parameter.
1. So far, the value of the parameter was not passed to the worker at all. 2. The worker coding was such that the parameter was only checked during logical decoding, but not when applying the data changes.
1 parent 87a86c4 commit 738551b

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

concurrent.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ static void apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
2727
Relation ident_index,
2828
TupleTableSlot *slot_dst_ind,
2929
partitions_hash *partitions,
30-
TupleConversionMapExt *conv_map);
30+
TupleConversionMapExt *conv_map,
31+
struct timeval *must_complete);
3132
static void apply_insert(HeapTuple tup, TupleTableSlot *slot,
3233
EState *estate, ModifyTableState *mtstate,
3334
struct PartitionTupleRouting *proute,
@@ -106,6 +107,20 @@ pg_rewrite_process_concurrent_changes(EState *estate,
106107
partitions && proute));
107108

108109
dstate = (DecodingOutputState *) ctx->output_writer_private;
110+
111+
/*
112+
* If some changes could not be applied due to time constraint, make sure
113+
* the tuplestore is empty before we insert new tuples into it.
114+
*/
115+
if (dstate->nchanges > 0)
116+
apply_concurrent_changes(estate, mtstate, proute,
117+
dstate, ident_key, ident_key_nentries,
118+
ident_index, slot_dst_ind,
119+
partitions, conv_map, must_complete);
120+
/* Ran out of time? */
121+
if (dstate->nchanges > 0)
122+
return false;
123+
109124
done = false;
110125
while (!done)
111126
{
@@ -129,7 +144,10 @@ pg_rewrite_process_concurrent_changes(EState *estate,
129144
apply_concurrent_changes(estate, mtstate, proute,
130145
dstate, ident_key, ident_key_nentries,
131146
ident_index, slot_dst_ind,
132-
partitions, conv_map);
147+
partitions, conv_map, must_complete);
148+
/* Ran out of time? */
149+
if (dstate->nchanges > 0)
150+
return false;
133151
}
134152

135153
return true;
@@ -233,7 +251,8 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
233251
Relation ident_index,
234252
TupleTableSlot *slot_dst_ind,
235253
partitions_hash *partitions,
236-
TupleConversionMapExt *conv_map)
254+
TupleConversionMapExt *conv_map,
255+
struct timeval *must_complete)
237256
{
238257
BulkInsertState bistate = NULL;
239258
HeapTuple tup_old = NULL;
@@ -272,6 +291,9 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
272291
bool isnull[1];
273292
Datum values[1];
274293

294+
Assert(dstate->nchanges > 0);
295+
dstate->nchanges--;
296+
275297
/* Get the change from the single-column tuple. */
276298
tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
277299
heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
@@ -320,10 +342,22 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
320342
/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
321343
Assert(shouldFree);
322344
pfree(tup_change);
345+
346+
/*
347+
* If there is a limit on the time of completion, check it
348+
* now. However, make sure the loop does not break if tup_old was set
349+
* in the previous iteration. In such a case we could not resume the
350+
* processing in the next call.
351+
*/
352+
if (must_complete && tup_old == NULL &&
353+
processing_time_elapsed(must_complete))
354+
/* The next call will process the remaining changes. */
355+
break;
323356
}
324357

325-
tuplestore_clear(dstate->tstore);
326-
dstate->nchanges = 0;
358+
/* If we could not apply all the changes, the next call will do. */
359+
if (dstate->nchanges == 0)
360+
tuplestore_clear(dstate->tstore);
327361

328362
PopActiveSnapshot();
329363

pg_rewrite.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ get_task(int *idx, char *relschema, char *relname, bool nowait)
453453
task->elevel = -1;
454454

455455
task->nowait = nowait;
456+
task->max_xlock_time = rewrite_max_xlock_time;
456457

457458
*idx = i;
458459
return task;
@@ -2395,12 +2396,13 @@ perform_final_merge(EState *estate,
23952396
{
23962397
int64 usec;
23972398
struct timeval t_start;
2399+
int max_xlock_time = MyWorkerTask->max_xlock_time;
23982400

23992401
gettimeofday(&t_start, NULL);
24002402
/* Add the whole seconds. */
2401-
t_end.tv_sec = t_start.tv_sec + rewrite_max_xlock_time / 1000;
2403+
t_end.tv_sec = t_start.tv_sec + max_xlock_time / 1000;
24022404
/* Add the rest, expressed in microseconds. */
2403-
usec = t_start.tv_usec + 1000 * (rewrite_max_xlock_time % 1000);
2405+
usec = t_start.tv_usec + 1000 * (max_xlock_time % 1000);
24042406
/* The number of microseconds could have overflown. */
24052407
t_end.tv_sec += usec / USECS_PER_SEC;
24062408
t_end.tv_usec = usec % USECS_PER_SEC;
@@ -2497,6 +2499,11 @@ perform_final_merge(EState *estate,
24972499
partitions,
24982500
conv_map,
24992501
NULL);
2502+
2503+
/* No time constraint, all changes must have been processed. */
2504+
Assert(((DecodingOutputState *)
2505+
ctx->output_writer_private)->nchanges == 0);
2506+
25002507
}
25012508

25022509
list_free(indexes);

pg_rewrite.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ typedef struct WorkerTask
171171
* backend might be interested in 'msg' and 'msg_detail'.
172172
*/
173173
bool nowait;
174+
175+
int max_xlock_time;
174176
} WorkerTask;
175177

176178
#define MAX_TASKS 8

0 commit comments

Comments
 (0)