|
1 | 1 | import shutil |
2 | 2 | import gzip |
3 | 3 | import logging |
4 | | -from typing import List, Dict |
| 4 | +import re |
| 5 | +from typing import List, Dict, Optional |
5 | 6 | from pathlib import Path |
6 | 7 | from cortex.utils.commands import run_command |
7 | 8 | from cortex.cleanup.scanner import CleanupScanner, ScanResult |
8 | 9 | from cortex.cleanup.manager import CleanupManager |
9 | 10 |
|
10 | 11 | logger = logging.getLogger(__name__) |
11 | 12 |
|
| 13 | +# Category constants to avoid duplication |
| 14 | +CATEGORY_PACKAGE_CACHE = "Package Cache" |
| 15 | +CATEGORY_ORPHANED_PACKAGES = "Orphaned Packages" |
| 16 | +CATEGORY_TEMP_FILES = "Temporary Files" |
| 17 | +CATEGORY_OLD_LOGS = "Old Logs" |
| 18 | + |
| 19 | +# Unit multipliers for parsing |
| 20 | +UNIT_MULTIPLIERS = { |
| 21 | + 'KB': 1024, |
| 22 | + 'MB': 1024 * 1024, |
| 23 | + 'GB': 1024 * 1024 * 1024, |
| 24 | +} |
| 25 | + |
12 | 26 | class DiskCleaner: |
13 | 27 | """ |
14 | 28 | Handles the actual cleanup operations including package cleaning, |
@@ -87,25 +101,28 @@ def _parse_freed_space(self, stdout: str) -> int: |
87 | 101 | Returns: |
88 | 102 | int: Bytes freed. |
89 | 103 | """ |
90 | | - freed_bytes = 0 |
91 | 104 | for line in stdout.splitlines(): |
92 | 105 | if "disk space will be freed" in line: |
93 | | - parts = line.split() |
94 | | - try: |
95 | | - for i, part in enumerate(parts): |
96 | | - if part.isdigit() or part.replace('.', '', 1).isdigit(): |
97 | | - val = float(part) |
98 | | - unit = parts[i+1] |
99 | | - if unit.upper().startswith('KB'): |
100 | | - freed_bytes = int(val * 1024) |
101 | | - elif unit.upper().startswith('MB'): |
102 | | - freed_bytes = int(val * 1024 * 1024) |
103 | | - elif unit.upper().startswith('GB'): |
104 | | - freed_bytes = int(val * 1024 * 1024 * 1024) |
105 | | - break |
106 | | - except Exception: |
107 | | - pass |
108 | | - return freed_bytes |
| 106 | + return self._extract_size_from_line(line) |
| 107 | + return 0 |
| 108 | + |
| 109 | + def _extract_size_from_line(self, line: str) -> int: |
| 110 | + """ |
| 111 | + Extract size in bytes from a line containing size information. |
| 112 | + |
| 113 | + Args: |
| 114 | + line (str): Line containing size info like "50.5 MB". |
| 115 | + |
| 116 | + Returns: |
| 117 | + int: Size in bytes. |
| 118 | + """ |
| 119 | + # Match patterns like "50.5 MB" or "512 KB" |
| 120 | + match = re.search(r'([\d.]+)\s*(KB|MB|GB)', line, re.IGNORECASE) |
| 121 | + if match: |
| 122 | + value = float(match.group(1)) |
| 123 | + unit = match.group(2).upper() |
| 124 | + return int(value * UNIT_MULTIPLIERS.get(unit, 1)) |
| 125 | + return 0 |
109 | 126 |
|
110 | 127 | def clean_temp_files(self, files: List[str]) -> int: |
111 | 128 | """ |
@@ -198,25 +215,37 @@ def run_cleanup(self, scan_results: List[ScanResult], safe: bool = True) -> Dict |
198 | 215 | Dict[str, int]: Summary of bytes freed per category. |
199 | 216 | """ |
200 | 217 | summary = { |
201 | | - "Package Cache": 0, |
202 | | - "Orphaned Packages": 0, |
203 | | - "Temporary Files": 0, |
204 | | - "Old Logs": 0 |
| 218 | + CATEGORY_PACKAGE_CACHE: 0, |
| 219 | + CATEGORY_ORPHANED_PACKAGES: 0, |
| 220 | + CATEGORY_TEMP_FILES: 0, |
| 221 | + CATEGORY_OLD_LOGS: 0 |
205 | 222 | } |
206 | 223 |
|
207 | 224 | for result in scan_results: |
208 | | - if result.category == "Package Cache": |
209 | | - summary["Package Cache"] = self.clean_package_cache() |
210 | | - |
211 | | - elif result.category == "Orphaned Packages": |
212 | | - # Only remove orphaned packages in non-safe mode |
213 | | - if not safe: |
214 | | - summary["Orphaned Packages"] = self.remove_orphaned_packages(result.items) |
215 | | - |
216 | | - elif result.category == "Temporary Files": |
217 | | - summary["Temporary Files"] = self.clean_temp_files(result.items) |
218 | | - |
219 | | - elif result.category == "Old Logs": |
220 | | - summary["Old Logs"] = self.compress_logs(result.items) |
| 225 | + freed = self._process_category(result, safe) |
| 226 | + if result.category in summary: |
| 227 | + summary[result.category] = freed |
221 | 228 |
|
222 | 229 | return summary |
| 230 | + |
| 231 | + def _process_category(self, result: ScanResult, safe: bool) -> int: |
| 232 | + """ |
| 233 | + Process a single cleanup category. |
| 234 | + |
| 235 | + Args: |
| 236 | + result (ScanResult): Scan result for the category. |
| 237 | + safe (bool): Whether to use safe mode. |
| 238 | + |
| 239 | + Returns: |
| 240 | + int: Bytes freed. |
| 241 | + """ |
| 242 | + if result.category == CATEGORY_PACKAGE_CACHE: |
| 243 | + return self.clean_package_cache() |
| 244 | + elif result.category == CATEGORY_ORPHANED_PACKAGES: |
| 245 | + # Only remove orphaned packages in non-safe mode |
| 246 | + return self.remove_orphaned_packages(result.items) if not safe else 0 |
| 247 | + elif result.category == CATEGORY_TEMP_FILES: |
| 248 | + return self.clean_temp_files(result.items) |
| 249 | + elif result.category == CATEGORY_OLD_LOGS: |
| 250 | + return self.compress_logs(result.items) |
| 251 | + return 0 |
0 commit comments