Skip to content

Commit c0bd646

Browse files
committed
refactored to be cleaner
1 parent 8e41e91 commit c0bd646

File tree

1 file changed

+49
-40
lines changed

1 file changed

+49
-40
lines changed

robusta_krr/core/integrations/kubernetes/__init__.py

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,16 @@ def _should_list_resource(self, resource: str) -> bool:
269269
return True
270270
return resource in settings.resources
271271

272+
def _is_job_owned_by_cronjob(self, job: V1Job) -> bool:
273+
"""Check if a job is owned by a CronJob."""
274+
return any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])
275+
276+
def _is_job_grouped(self, job: V1Job) -> bool:
277+
"""Check if a job has any of the grouping labels."""
278+
if not settings.job_grouping_labels or not job.metadata.labels:
279+
return False
280+
return any(label in job.metadata.labels for label in settings.job_grouping_labels)
281+
272282
async def _list_namespaced_or_global_objects_batched(
273283
self,
274284
kind: KindLiteral,
@@ -314,25 +324,22 @@ async def _list_namespaced_or_global_objects_batched(
314324
for item in request_result.items
315325
]
316326

317-
# Extract continue token from the first result
318327
next_continue_ref = None
319328
if requests:
320329
first_result = await requests[0]
321330
next_continue_ref = getattr(first_result.metadata, '_continue', None)
322331

323-
logger.debug(f"Found {len(result)} {kind} in {self.cluster}")
324332
return result, next_continue_ref
325333

326334
except ApiException as e:
327335
if e.status == 410 and e.body:
328-
# Continue token expired, extract new token from error and continue
336+
# Continue token expired
329337
import json
330338
try:
331339
error_body = json.loads(e.body)
332340
new_continue_token = error_body.get("metadata", {}).get("continue")
333341
if new_continue_token:
334342
logger.info("Continue token expired for jobs listing. Continuing")
335-
# Retry with new continue token
336343
return await self._list_namespaced_or_global_objects_batched(
337344
kind=kind,
338345
all_namespaces_request=all_namespaces_request,
@@ -561,25 +568,23 @@ async def _list_all_jobs(self) -> list[K8sObjectData]:
561568
continue_ref=continue_ref,
562569
)
563570

564-
# Process jobs in this batch
571+
batch_count += 1
572+
continue_ref = next_continue_ref
573+
574+
# refreshed continue token
575+
if not jobs_batch and continue_ref:
576+
continue
577+
565578
for job in jobs_batch:
566-
# Skip jobs owned by CronJobs
567-
if any(owner.kind == "CronJob" for owner in job.metadata.owner_references or []):
579+
if self._is_job_owned_by_cronjob(job):
568580
continue
569581

570-
# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
571-
if settings.job_grouping_labels and job.metadata.labels:
572-
if any(label in job.metadata.labels for label in settings.job_grouping_labels):
573-
continue
582+
if self._is_job_grouped(job):
583+
continue
574584

575-
# Add regular jobs
576585
for container in job.spec.template.spec.containers:
577586
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
578-
579-
batch_count += 1
580-
581-
# Check if we have more batches
582-
continue_ref = next_continue_ref
587+
583588
if not continue_ref:
584589
break
585590

@@ -613,10 +618,8 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
613618

614619
logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels)
615620

616-
# Get all jobs that have any of the grouping labels using batched loading
617-
618621
grouped_jobs = defaultdict(list)
619-
grouped_jobs_template = {} # Store only ONE full job as template per group
622+
grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData
620623
continue_ref: Optional[str] = None
621624
batch_count = 0
622625

@@ -630,28 +633,34 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
630633
continue_ref=continue_ref,
631634
)
632635

633-
# Process jobs in this batch immediately - only keep grouped jobs
634-
for job in jobs_batch:
635-
if (job.metadata.labels and
636-
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
637-
638-
for label_name in settings.job_grouping_labels:
639-
if label_name in job.metadata.labels:
640-
label_value = job.metadata.labels[label_name]
641-
group_key = f"{label_name}={label_value}"
642-
# Store lightweight job info only
643-
lightweight_job = LightweightJobInfo(
644-
name=job.metadata.name,
645-
namespace=job.metadata.namespace
646-
)
647-
grouped_jobs[group_key].append(lightweight_job)
648-
# Keep only ONE full job as template per group
649-
if group_key not in grouped_jobs_template:
650-
grouped_jobs_template[group_key] = job
651-
652636
batch_count += 1
653-
654637
continue_ref = next_continue_ref
638+
639+
# refreshed continue token
640+
if not jobs_batch and continue_ref:
641+
continue
642+
643+
for job in jobs_batch:
644+
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
645+
continue
646+
647+
for label_name in settings.job_grouping_labels:
648+
if label_name not in job.metadata.labels:
649+
continue
650+
651+
# label_name is value of grouped job label
652+
label_value = job.metadata.labels[label_name]
653+
group_key = f"{label_name}={label_value}"
654+
# Store lightweight job info only
655+
lightweight_job = LightweightJobInfo(
656+
name=job.metadata.name,
657+
namespace=job.metadata.namespace
658+
)
659+
grouped_jobs[group_key].append(lightweight_job)
660+
# Keep only ONE full job as template per group
661+
if group_key not in grouped_jobs_template:
662+
grouped_jobs_template[group_key] = job
663+
655664
if not continue_ref:
656665
break
657666

0 commit comments

Comments
 (0)