Skip to content

Commit fdfcae7

Browse files
authored
Merge pull request #3989 from kobotoolbox/3988-celery-priority-queues
Move background tasks for REST Services to their own queue
2 parents a86a6d9 + a2bc740 commit fdfcae7

File tree

7 files changed

+31
-6
lines changed

7 files changed

+31
-6
lines changed

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ RUN mkdir -p "${NGINX_STATIC_DIR}" && \
4444
mkdir -p ${CELERY_PID_DIR} && \
4545
mkdir -p ${SERVICES_DIR}/uwsgi && \
4646
mkdir -p ${SERVICES_DIR}/celery && \
47+
mkdir -p ${SERVICES_DIR}/celery_low_priority && \
4748
mkdir -p ${SERVICES_DIR}/celery_beat && \
4849
mkdir -p "${INIT_PATH}"
4950

@@ -155,6 +156,7 @@ RUN rm -rf /etc/runit/runsvdir/default/getty-tty*
155156
# Create symlinks for runsv services
156157
RUN ln -s "${KPI_SRC_DIR}/docker/run_uwsgi.bash" "${SERVICES_DIR}/uwsgi/run" && \
157158
ln -s "${KPI_SRC_DIR}/docker/run_celery.bash" "${SERVICES_DIR}/celery/run" && \
159+
ln -s "${KPI_SRC_DIR}/docker/run_celery_low_priority.bash" "${SERVICES_DIR}/celery_low_priority/run" && \
158160
ln -s "${KPI_SRC_DIR}/docker/run_celery_beat.bash" "${SERVICES_DIR}/celery_beat/run"
159161

160162

docker/run_celery.bash

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
set -e
33
source /etc/profile
44

5-
# Run the main Celery worker (will not process `sync_kobocat_xforms` jobs).
5+
# Run the main Celery worker (will not process low priority jobs).
66
# Start 2 processes by default; this will be overridden later, in Python code,
77
# according to the user's preference saved by django-constance
88
cd "${KPI_SRC_DIR}"
@@ -11,7 +11,8 @@ exec celery -A kobo worker --loglevel=info \
1111
--hostname=kpi_main_worker@%h \
1212
--logfile=${KPI_LOGS_DIR}/celery.log \
1313
--pidfile=/tmp/celery.pid \
14-
--exclude-queues=sync_kobocat_xforms_queue \
14+
--queues=kpi_queue \
15+
--exclude-queues=kpi_low_priority_queue \
1516
--uid=${UWSGI_USER} \
1617
--gid=${UWSGI_GROUP} \
1718
--autoscale 2,2

docker/run_celery_low_priority.bash

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/bin/bash
2+
set -e
3+
source /etc/profile
4+
5+
# Run the main Celery worker (will process only low priority jobs).
6+
# Start 2 processes by default; this will be overridden later, in Python code,
7+
# according to the user's preference saved by django-constance
8+
cd "${KPI_SRC_DIR}"
9+
10+
exec celery -A kobo worker --loglevel=info \
11+
--hostname=kpi_main_worker@%h \
12+
--logfile=${KPI_LOGS_DIR}/celery_low_priority.log \
13+
--pidfile=/tmp/celery_low_priority.pid \
14+
--queues=kpi_low_priority_queue \
15+
--exclude-queues=kpi_queue \
16+
--uid=${UWSGI_USER} \
17+
--gid=${UWSGI_GROUP} \
18+
--autoscale 2,2

kobo/apps/hook/tests/test_email.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def _create_periodic_task(self):
3434
def test_notifications(self):
3535
self._create_periodic_task()
3636
first_log_response = self._send_and_fail()
37-
failures_reports.delay()
37+
failures_reports.apply_async(queue='kpi_low_priority_queue')
3838
self.assertEqual(len(mail.outbox), 1)
3939

4040
expected_record = {

kobo/apps/hook/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def call_services(asset: 'kpi.models.asset.Asset', submission_id: int):
2525
submission_id=submission_id, hook_id=hook_id
2626
).exists():
2727
success = True
28-
service_definition_task.delay(hook_id, submission_id)
28+
service_definition_task.apply_async(
29+
queue='kpi_low_priority_queue', args=(hook_id, submission_id)
30+
)
2931

3032
return success

kobo/apps/hook/views/v2/hook.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ def retry(self, request, uid=None, *args, **kwargs):
192192
# Mark all logs as PENDING
193193
HookLog.objects.filter(id__in=hooklogs_ids).update(status=HOOK_LOG_PENDING)
194194
# Delegate to Celery
195-
retry_all_task.delay(hooklogs_ids)
195+
retry_all_task.apply_async(
196+
queue='kpi_low_priority_queue', args=(hooklogs_ids,)
197+
)
196198
response.update({
197199
"pending_uids": hooklogs_uids
198200
})

kobo/settings/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ def __init__(self, *args, **kwargs):
613613
"send-hooks-failures-reports": {
614614
"task": "kobo.apps.hook.tasks.failures_reports",
615615
"schedule": crontab(hour=0, minute=0),
616-
'options': {'queue': 'kpi_queue'}
616+
'options': {'queue': 'kpi_low_priority_queue'}
617617
},
618618
}
619619

0 commit comments

Comments
 (0)