Skip to content

Commit 47cd19b

Browse files
authored
Merge pull request #4443 from kobotoolbox/improve-import-export-tasks-deletion
Improve long-running trash-bin tasks duration
2 parents 2ffec2c + e2e0c03 commit 47cd19b

File tree

6 files changed

+149
-5
lines changed

6 files changed

+149
-5
lines changed

kobo/apps/superuser_stats/tasks.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,10 @@ def generate_domain_report(output_filename: str, start_date: str, end_date: str)
227227
writer.writerow(row)
228228

229229

230-
@shared_task(soft_time_limit=4200, time_limit=4260)
230+
@shared_task(
231+
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
232+
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
233+
)
231234
def generate_forms_count_by_submission_range(output_filename: str):
232235
# List of submissions count ranges
233236
ranges = [

kobo/apps/trash_bin/tasks.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from celery.signals import task_failure, task_retry
4+
from django.conf import settings
45
from django.contrib.auth import get_user_model
56
from django.db import transaction
67
from django.db.models.signals import post_delete
@@ -33,6 +34,9 @@
3334
retry_backoff_max=600,
3435
max_retries=5,
3536
retry_jitter=False,
37+
queue='kpi_low_priority_queue',
38+
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
39+
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
3640
)
3741
def empty_account(account_trash_id: int):
3842
with transaction.atomic():
@@ -158,6 +162,9 @@ def empty_account(account_trash_id: int):
158162
retry_backoff_max=600,
159163
max_retries=5,
160164
retry_jitter=False,
165+
queue='kpi_low_priority_queue',
166+
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
167+
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
161168
)
162169
def empty_project(project_trash_id: int):
163170
with transaction.atomic():

kobo/apps/trash_bin/utils.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
from datetime import timedelta
66

77
from django.contrib.auth import get_user_model
8+
from django.conf import settings
89
from django.db import IntegrityError, models, transaction
9-
from django.db.models import F
10+
from django.db.models import F, Q
1011
from django.db.models.signals import pre_delete
1112
from django.utils.timezone import now
1213
from django_celery_beat.models import (
@@ -38,19 +39,22 @@ def delete_asset(request_author: 'auth.User', asset: 'kpi.Asset'):
3839

3940
asset_id = asset.pk
4041
asset_uid = asset.uid
42+
host = settings.KOBOFORM_URL
4143
owner_username = asset.owner.username
4244
project_exports = []
4345
if asset.has_deployment:
4446
_delete_submissions(request_author, asset)
4547
asset.deployment.delete()
4648
project_exports = ExportTask.objects.filter(
47-
data__source__endswith=f'/api/v2/assets/{asset.uid}/'
49+
Q(data__source=f'{host}/api/v2/assets/{asset.uid}/')
50+
| Q(data__source=f'{host}/assets/{asset.uid}/')
4851
)
4952

5053
with transaction.atomic():
5154
# Delete imports
5255
ImportTask.objects.filter(
53-
data__destination__endswith=f'/api/v2/assets/{asset.uid}/'
56+
Q(data__destination=f'{host}/api/v2/assets/{asset.uid}/')
57+
| Q(data__destination=f'{host}/assets/{asset.uid}/')
5458
).delete()
5559
# Delete exports (and related files on storage)
5660
for export in project_exports:

kobo/settings/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,15 @@ def __init__(self, *args, **kwargs):
804804
CELERY_BROKER_URL = os.environ.get('KPI_BROKER_URL', 'redis://localhost:6379/1')
805805
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
806806

807+
# Increase limits for long-running tasks
808+
# Notes: They are custom name, not part of `CELERY_*` namespace.
809+
CELERY_LONG_RUNNING_TASK_TIME_LIMIT = int(
810+
os.environ.get('CELERY_LONG_RUNNING_TASK_TIME_LIMIT', 4260) # seconds
811+
)
812+
813+
CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT = int(
814+
os.environ.get('CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT', 4200) # seconds
815+
)
807816

808817
''' Django allauth configuration '''
809818
ACCOUNT_ADAPTER = 'kobo.apps.accounts.adapter.AccountAdapter'
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Generated by Django 3.2.15 on 2023-05-18 21:37
2+
3+
import django.contrib.postgres.indexes
4+
import django.db.models.expressions
5+
from django.conf import settings
6+
from django.db import migrations
7+
8+
9+
def manually_create_indexes_instructions(apps, schema_editor):
10+
print(
11+
"""
12+
!!! ATTENTION !!!
13+
You need to run the SQL queries below in PostgreSQL directly:
14+
15+
> CREATE INDEX CONCURRENTLY "data__destination_idx" ON "kpi_importtask" USING btree ((("data" -> 'destination')));
16+
> CREATE INDEX CONCURRENTLY "data__source_idx" ON "kpi_exporttask" USING btree ((("data" -> 'source')));
17+
18+
> CREATE INDEX CONCURRENTLY "data__destination_hash_idx" ON "kpi_importtask" USING hash ((("data" -> 'destination')));
19+
> CREATE INDEX CONCURRENTLY "data__source_hash_idx" ON "kpi_exporttask" USING hash ((("data" -> 'source')));
20+
21+
Otherwise, project deletions will perform very poorly.
22+
23+
You may create one index per table simultaneously, i.e. you may run the
24+
first two queries in parallel (within different psql sessions) and then
25+
run the next two in parallel afterwards. You cannot run all of them
26+
at the same time.
27+
"""
28+
)
29+
30+
31+
def manually_drop_indexes_instructions(apps, schema_editor):
32+
print(
33+
"""
34+
!!! ATTENTION !!!
35+
Run the SQL queries below in PostgreSQL directly:
36+
37+
> DROP INDEX CONCURRENTLY IF EXISTS "data__destination_idx";
38+
> DROP INDEX CONCURRENTLY IF EXISTS "data__source_idx";
39+
40+
> DROP INDEX CONCURRENTLY IF EXISTS "data__destination_hash_idx";
41+
> DROP INDEX CONCURRENTLY IF EXISTS "data__source_hash_idx";
42+
43+
You may remove one index per table simultaneously, i.e. you may run the
44+
first two queries in parallel (within different psql sessions) and then
45+
run the next two in parallel afterwards. You cannot run all of them
46+
at the same time.
47+
"""
48+
)
49+
50+
51+
class Migration(migrations.Migration):
52+
53+
dependencies = [
54+
('kpi', '0049_add_pending_delete_to_asset'),
55+
]
56+
57+
if not settings.SKIP_HEAVY_MIGRATIONS:
58+
print(
59+
"""
60+
This might take a while. If it is too slow, you may want to
61+
interrupt this migration, cancel any outstanding `CREATE…` or `DROP
62+
INDEX` queries on `kpi_importtask` and `kpi_exporttask`, re-run the
63+
migration with `SKIP_HEAVY_MIGRATIONS=True`, and then follow the
64+
printed instructions to set up the indexes concurrently (without
65+
downtime) using raw SQL.
66+
"""
67+
)
68+
operations = [
69+
migrations.AddIndex(
70+
model_name='importtask',
71+
index=django.contrib.postgres.indexes.BTreeIndex(
72+
django.db.models.expressions.F('data__destination'),
73+
name='data__destination_idx',
74+
),
75+
),
76+
migrations.AddIndex(
77+
model_name='importtask',
78+
index=django.contrib.postgres.indexes.HashIndex(
79+
django.db.models.expressions.F('data__destination'),
80+
name='data__destination_hash_idx',
81+
),
82+
),
83+
migrations.AddIndex(
84+
model_name='exporttask',
85+
index=django.contrib.postgres.indexes.BTreeIndex(
86+
django.db.models.expressions.F('data__source'), name='data__source_idx'),
87+
),
88+
migrations.AddIndex(
89+
model_name='exporttask',
90+
index=django.contrib.postgres.indexes.HashIndex(
91+
django.db.models.expressions.F('data__source'),
92+
name='data__source_hash_idx'),
93+
),
94+
]
95+
else:
96+
operations = [
97+
migrations.RunPython(
98+
manually_create_indexes_instructions,
99+
manually_drop_indexes_instructions,
100+
)
101+
]

kpi/models/import_export_task.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import constance
1919
import requests
2020
from django.conf import settings
21+
from django.contrib.postgres.indexes import BTreeIndex, HashIndex
2122
from django.core.files.storage import FileSystemStorage
2223
from django.db import models, transaction
24+
from django.db.models import F
2325
from django.urls import reverse
2426
from django.utils.translation import gettext as t
2527
import formpack
@@ -36,7 +38,6 @@
3638
from formpack.utils.kobo_locking import get_kobo_locking_profiles
3739
from formpack.utils.string import ellipsize
3840
from private_storage.fields import PrivateFileField
39-
from pyxform import xls2json_backends
4041
from rest_framework import exceptions
4142
from werkzeug.http import parse_options_header
4243
from openpyxl.utils.exceptions import InvalidFileException
@@ -228,6 +229,16 @@ class ImportTask(ImportExportTask):
228229
...although we probably would need to store the file in a blob
229230
"""
230231

232+
class Meta(ImportExportTask.Meta):
233+
indexes = [
234+
BTreeIndex(
235+
F('data__destination'), name='data__destination_idx'
236+
),
237+
HashIndex(
238+
F('data__destination'), name='data__destination_hash_idx'
239+
),
240+
]
241+
231242
def _run_task(self, messages):
232243
self.status = self.PROCESSING
233244
self.save(update_fields=['status'])
@@ -569,6 +580,14 @@ def __str__(self):
569580
class Meta:
570581
abstract = True
571582
ordering = ['-date_created']
583+
indexes = [
584+
BTreeIndex(
585+
F('data__source'), name='data__source_idx'
586+
),
587+
HashIndex(
588+
F('data__source'), name='data__source_hash_idx'
589+
),
590+
]
572591

573592
def _build_export_filename(self, export, export_type):
574593
"""
@@ -919,6 +938,7 @@ class ExportTask(ExportTaskBase):
919938
"""
920939
An asynchronous export task, to be run with Celery
921940
"""
941+
922942
def _run_task(self, messages):
923943
try:
924944
source_url = self.data['source']

0 commit comments

Comments
 (0)