Skip to content

Commit ba3b3c1

Browse files
committed
Change Live Evaluation API to use RQ #1953
Signed-off-by: Michael Ehab Mikhail <[email protected]>
1 parent 812bc00 commit ba3b3c1

File tree

6 files changed

+219
-69
lines changed

6 files changed

+219
-69
lines changed

vulnerabilities/api_v2.py

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from vulnerabilities.models import VulnerabilityReference
4545
from vulnerabilities.models import VulnerabilitySeverity
4646
from vulnerabilities.models import Weakness
47+
from vulnerabilities.tasks import enqueue_ad_hoc_pipeline
4748
from vulnerabilities.throttling import PermissionBasedUserRateThrottle
4849

4950

@@ -1300,8 +1301,7 @@ def lookup(self, request):
13001301

13011302

13021303
class LiveEvaluationSerializer(serializers.Serializer):
1303-
purl_string = serializers.CharField(help_text="PackageURL to evaluate")
1304-
no_threading = serializers.BooleanField(required=False, default=False)
1304+
purl = serializers.CharField(help_text="PackageURL to evaluate")
13051305

13061306

13071307
class LiveEvaluationViewSet(viewsets.GenericViewSet):
@@ -1310,7 +1310,7 @@ class LiveEvaluationViewSet(viewsets.GenericViewSet):
13101310
@extend_schema(
13111311
request=LiveEvaluationSerializer,
13121312
responses={
1313-
202: {"description": "Live evaluation done successfully"},
1313+
202: {"description": "Live evaluation enqueued successfully; returns Run IDs"},
13141314
400: {"description": "Invalid request"},
13151315
500: {"description": "Internal server error"},
13161316
},
@@ -1324,8 +1324,7 @@ def evaluate(self, request):
13241324
status=status.HTTP_400_BAD_REQUEST,
13251325
)
13261326

1327-
purl_string = serializer.validated_data.get("purl_string")
1328-
no_threading = serializer.validated_data.get("no_threading", False)
1327+
purl_string = serializer.validated_data.get("purl")
13291328

13301329
try:
13311330
purl = PackageURL.from_string(purl_string) if purl_string else None
@@ -1349,31 +1348,78 @@ def evaluate(self, request):
13491348
status=status.HTTP_400_BAD_REQUEST,
13501349
)
13511350

1352-
results = []
1351+
# Create a single LivePipelineRun to represent this evaluation
1352+
from vulnerabilities.models import LivePipelineRun
13531353

1354-
def run_importer(importer):
1354+
live_run = LivePipelineRun.objects.create(purl=purl_string)
1355+
runs = []
1356+
for importer in importers:
13551357
importer_name = getattr(importer, "pipeline_id", importer.__name__)
1356-
response_data = {"importer": importer_name, "purl": purl_string, "steps_completed": []}
1358+
run_id = enqueue_ad_hoc_pipeline(importer_name, inputs={"purl": purl})
1359+
# Attach each PipelineRun to the LivePipelineRun
1360+
from vulnerabilities.models import PipelineRun
1361+
13571362
try:
1358-
pipeline_instance = importer(purl=purl)
1359-
status_code, error = pipeline_instance.execute()
1360-
if status_code != 0:
1361-
response_data["error"] = f"Importer {importer_name} failed: {error}"
1362-
else:
1363-
response_data["steps_completed"].append("import")
1364-
except Exception as e:
1365-
response_data["error"] = f"Error running importer {importer_name}: {str(e)}"
1366-
return response_data
1367-
1368-
if not no_threading and len(importers) > 1:
1369-
with ThreadPoolExecutor(max_workers=len(importers)) as executor:
1370-
future_to_importer = {
1371-
executor.submit(run_importer, importer): importer for importer in importers
1363+
run_obj = PipelineRun.objects.get(run_id=run_id)
1364+
run_obj.live_pipeline = live_run
1365+
run_obj.save()
1366+
except PipelineRun.DoesNotExist:
1367+
pass
1368+
runs.append(
1369+
{
1370+
"importer": importer_name,
1371+
"run_id": str(run_id) if run_id else None,
13721372
}
1373-
for future in as_completed(future_to_importer):
1374-
results.append(future.result())
1375-
else:
1376-
for importer in importers:
1377-
results.append(run_importer(importer))
1373+
)
1374+
return Response(
1375+
{"live_run_id": str(live_run.run_id), "runs": runs}, status=status.HTTP_202_ACCEPTED
1376+
)
13781377

1379-
return Response(results, status=status.HTTP_202_ACCEPTED)
1378+
@extend_schema(
1379+
parameters=[
1380+
OpenApiParameter(
1381+
name="live_run_id",
1382+
description="UUID of the live run to check status for",
1383+
required=True,
1384+
type={"type": "string"},
1385+
location=OpenApiParameter.PATH,
1386+
)
1387+
],
1388+
responses={200: "LivePipelineRun status and importers status"},
1389+
)
1390+
@action(detail=False, methods=["get"], url_path=r"status/(?P<live_run_id>[0-9a-f\-]{36})")
1391+
def status(self, request, live_run_id=None):
1392+
from vulnerabilities.models import LivePipelineRun
1393+
from vulnerabilities.models import PipelineRun
1394+
1395+
try:
1396+
live_run = LivePipelineRun.objects.get(run_id=live_run_id)
1397+
except LivePipelineRun.DoesNotExist:
1398+
return Response({"detail": "Live run not found."}, status=status.HTTP_404_NOT_FOUND)
1399+
1400+
live_run.update_status()
1401+
1402+
# Gather status for each importer run
1403+
importer_statuses = []
1404+
for run in live_run.pipelineruns.all():
1405+
importer_statuses.append(
1406+
{
1407+
"importer": run.pipeline.pipeline_id,
1408+
"run_id": str(run.run_id),
1409+
"status": run.status,
1410+
"run_start_date": run.run_start_date,
1411+
"run_end_date": run.run_end_date,
1412+
"run_exitcode": run.run_exitcode,
1413+
"run_output": run.run_output,
1414+
}
1415+
)
1416+
1417+
response = {
1418+
"live_run_id": str(live_run.run_id),
1419+
"overall_status": live_run.status,
1420+
"created_date": live_run.created_date,
1421+
"completed_date": live_run.completed_date,
1422+
"purl": live_run.purl,
1423+
"importers": importer_statuses,
1424+
}
1425+
return Response(response)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Generated by Django 4.2.22 on 2025-08-25 18:03
2+
3+
from django.db import migrations, models
4+
import django.db.models.deletion
5+
import uuid
6+
7+
8+
class Migration(migrations.Migration):
9+
10+
dependencies = [
11+
("vulnerabilities", "0101_advisorytodov2_todorelatedadvisoryv2_and_more"),
12+
]
13+
14+
operations = [
15+
migrations.CreateModel(
16+
name="LivePipelineRun",
17+
fields=[
18+
(
19+
"run_id",
20+
models.UUIDField(
21+
default=uuid.uuid4,
22+
editable=False,
23+
primary_key=True,
24+
serialize=False,
25+
unique=True,
26+
),
27+
),
28+
("created_date", models.DateTimeField(auto_now_add=True, db_index=True)),
29+
("completed_date", models.DateTimeField(blank=True, editable=False, null=True)),
30+
("status", models.CharField(default="queued", max_length=20)),
31+
("purl", models.CharField(blank=True, max_length=300, null=True)),
32+
],
33+
options={
34+
"ordering": ["-created_date"],
35+
},
36+
),
37+
migrations.AddField(
38+
model_name="pipelinerun",
39+
name="live_pipeline",
40+
field=models.ForeignKey(
41+
blank=True,
42+
null=True,
43+
on_delete=django.db.models.deletion.CASCADE,
44+
related_name="pipelineruns",
45+
to="vulnerabilities.livepipelinerun",
46+
),
47+
),
48+
]

vulnerabilities/models.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,6 +1972,35 @@ class CodeFixV2(CodeChangeV2):
19721972
)
19731973

19741974

1975+
class LivePipelineRun(models.Model):
1976+
"""Represents a single live evaluation run for all compatible importers."""
1977+
1978+
run_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True)
1979+
created_date = models.DateTimeField(auto_now_add=True, db_index=True)
1980+
completed_date = models.DateTimeField(blank=True, null=True, editable=False)
1981+
status = models.CharField(max_length=20, default="queued")
1982+
purl = models.CharField(max_length=300, blank=True, null=True)
1983+
1984+
def is_finished(self):
1985+
return self.status == "finished"
1986+
1987+
def update_status(self):
1988+
if not self.pipelineruns.exists():
1989+
self.status = "queued"
1990+
elif all(run.status == PipelineRun.Status.SUCCESS for run in self.pipelineruns.all()):
1991+
self.status = "finished"
1992+
self.completed_date = timezone.now()
1993+
elif any(run.status == PipelineRun.Status.FAILURE for run in self.pipelineruns.all()):
1994+
self.status = "failed"
1995+
self.completed_date = timezone.now()
1996+
else:
1997+
self.status = "running"
1998+
self.save()
1999+
2000+
class Meta:
2001+
ordering = ["-created_date"]
2002+
2003+
19752004
class PipelineRun(models.Model):
19762005
"""The Database representation of a pipeline execution."""
19772006

@@ -1981,6 +2010,14 @@ class PipelineRun(models.Model):
19812010
on_delete=models.CASCADE,
19822011
)
19832012

2013+
live_pipeline = models.ForeignKey(
2014+
"LivePipelineRun",
2015+
related_name="pipelineruns",
2016+
on_delete=models.CASCADE,
2017+
blank=True,
2018+
null=True,
2019+
)
2020+
19842021
run_id = models.UUIDField(
19852022
primary_key=True,
19862023
default=uuid.uuid4,
@@ -2245,8 +2282,9 @@ def append_to_log(self, message, is_multiline=False):
22452282
if not is_multiline:
22462283
message = message.replace("\n", "").replace("\r", "")
22472284

2248-
self.log = self.log + message + "\n"
2249-
self.save(update_fields=["log"])
2285+
new_log = (self.log or "") + message + "\n"
2286+
type(self).objects.filter(run_id=self.run_id).update(log=new_log)
2287+
self.log = new_log
22502288

22512289
def dequeue(self):
22522290
from vulnerabilities.tasks import dequeue_job
@@ -2342,12 +2380,15 @@ def save(self, *args, **kwargs):
23422380
def pipeline_class(self):
23432381
"""Return the pipeline class."""
23442382
from vulnerabilities.importers import IMPORTERS_REGISTRY
2383+
from vulnerabilities.importers import LIVE_IMPORTERS_REGISTRY
23452384
from vulnerabilities.improvers import IMPROVERS_REGISTRY
23462385

23472386
if self.pipeline_id in IMPROVERS_REGISTRY:
23482387
return IMPROVERS_REGISTRY.get(self.pipeline_id)
23492388
if self.pipeline_id in IMPORTERS_REGISTRY:
23502389
return IMPORTERS_REGISTRY.get(self.pipeline_id)
2390+
if self.pipeline_id in LIVE_IMPORTERS_REGISTRY:
2391+
return LIVE_IMPORTERS_REGISTRY.get(self.pipeline_id)
23512392

23522393
@property
23532394
def description(self):

vulnerabilities/tasks.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
logger = logging.getLogger(__name__)
2323

24-
queue = django_rq.get_queue("default")
24+
default_queue = django_rq.get_queue("default")
2525

2626

27-
def execute_pipeline(pipeline_id, run_id):
27+
def execute_pipeline(pipeline_id, run_id, inputs=None):
2828
from vulnerabilities.pipelines import VulnerableCodePipeline
2929

3030
logger.info(f"Enter `execute_pipeline` {pipeline_id}")
@@ -39,7 +39,8 @@ def execute_pipeline(pipeline_id, run_id):
3939
exitcode = 0
4040
run_class = run.pipeline_class
4141
if issubclass(run_class, VulnerableCodePipeline):
42-
pipeline_instance = run_class(run_instance=run)
42+
inputs = inputs or {}
43+
pipeline_instance = run_class(run_instance=run, **inputs)
4344
exitcode, output = pipeline_instance.execute()
4445
elif issubclass(run_class, Importer) or issubclass(run_class, Improver):
4546
exitcode, output = legacy_runner(run_class=run_class, run=run)
@@ -121,7 +122,7 @@ def enqueue_pipeline(pipeline_id):
121122
run = models.PipelineRun.objects.create(
122123
pipeline=pipeline_schedule,
123124
)
124-
job = queue.enqueue(
125+
job = default_queue.enqueue(
125126
execute_pipeline,
126127
pipeline_id,
127128
run.run_id,
@@ -131,7 +132,35 @@ def enqueue_pipeline(pipeline_id):
131132
)
132133

133134

135+
def enqueue_ad_hoc_pipeline(pipeline_id, *, inputs=None):
136+
"""Enqueue a one-off execution for the given pipeline_id with optional inputs.
137+
138+
Returns the created run_id or None if the pipeline cannot be enqueued.
139+
"""
140+
try:
141+
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
142+
except models.PipelineSchedule.DoesNotExist:
143+
pipeline_schedule = models.PipelineSchedule.objects.create(
144+
pipeline_id=pipeline_id,
145+
is_active=False,
146+
)
147+
148+
run = models.PipelineRun.objects.create(pipeline=pipeline_schedule)
149+
150+
live_queue = django_rq.get_queue("live")
151+
job = live_queue.enqueue(
152+
execute_pipeline,
153+
pipeline_id,
154+
run.run_id,
155+
inputs or {},
156+
job_id=str(run.run_id),
157+
on_failure=set_run_failure,
158+
job_timeout=f"{pipeline_schedule.execution_timeout}h",
159+
)
160+
return run.run_id
161+
162+
134163
def dequeue_job(job_id):
135164
"""Remove a job from queue if it hasn't been executed yet."""
136-
if job_id in queue.jobs:
137-
queue.remove(job_id)
165+
if job_id in default_queue.jobs:
166+
default_queue.remove(job_id)

0 commit comments

Comments
 (0)