-
-
Notifications
You must be signed in to change notification settings - Fork 19
feat: Add system health monitoring module (#128) #292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
aa17f57
bdfcccf
95215f7
dff2092
f7a5653
dc4143e
6ba1715
618e075
d4cc451
257c5d3
bb34f9f
b5b7202
22939a6
aebccd1
e62be89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import shutil | ||
| from ..monitor import HealthCheck, CheckResult | ||
|
|
||
| class DiskCheck(HealthCheck): | ||
| def run(self) -> CheckResult: | ||
| total, used, free = shutil.disk_usage("/") | ||
|
Check warning on line 6 in cortex/health/checks/disk.py
|
||
| # Calculate usage percentage | ||
| usage_percent = (used / total) * 100 | ||
|
|
||
| score = 100 | ||
| status = "OK" | ||
| details = f"{usage_percent:.1f}% Used" | ||
| rec = None | ||
|
|
||
| # Scoring logic (Spec compliant) | ||
| if usage_percent > 90: | ||
| score = 0 | ||
| status = "CRITICAL" | ||
| rec = "Clean package cache (+50 pts)" | ||
| elif usage_percent > 80: | ||
|
Check warning on line 20 in cortex/health/checks/disk.py
|
||
| score = 50 | ||
| status = "WARNING" | ||
| rec = "Clean package cache (+10 pts)" | ||
|
|
||
| return CheckResult( | ||
| name="Disk Space", | ||
| category="disk", | ||
| score=score, | ||
| status=status, | ||
| details=details, | ||
| recommendation=rec, | ||
| weight=0.15 # 15% | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import os | ||
| import multiprocessing | ||
| from ..monitor import HealthCheck, CheckResult | ||
|
|
||
| class PerformanceCheck(HealthCheck): | ||
| def run(self) -> CheckResult: | ||
| score = 100 | ||
| issues = [] | ||
| rec = None | ||
|
|
||
| # 1. Load Average (1min) | ||
| try: | ||
| load1, _, _ = os.getloadavg() | ||
| cores = multiprocessing.cpu_count() | ||
| # Load ratio against core count | ||
| load_ratio = load1 / cores | ||
|
|
||
| if load_ratio > 1.0: | ||
| score -= 50 | ||
| issues.append(f"High Load ({load1:.2f})") | ||
| rec = "Check top processes" | ||
| except Exception: | ||
| pass # Skip on Windows etc. | ||
|
|
||
| # 2. Memory Usage (Linux /proc/meminfo) | ||
| try: | ||
| with open('/proc/meminfo', 'r') as f: | ||
| meminfo = {} | ||
| for line in f: | ||
| parts = line.split(':') | ||
| if len(parts) == 2: | ||
| meminfo[parts[0].strip()] = int(parts[1].strip().split()[0]) | ||
|
|
||
| if 'MemTotal' in meminfo and 'MemAvailable' in meminfo: | ||
| total = meminfo['MemTotal'] | ||
| avail = meminfo['MemAvailable'] | ||
| used_percent = ((total - avail) / total) * 100 | ||
|
|
||
| if used_percent > 80: | ||
| penalty = int(used_percent - 80) | ||
| score -= penalty | ||
| issues.append(f"High Memory ({used_percent:.0f}%)") | ||
| except FileNotFoundError: | ||
| pass # Non-Linux systems | ||
|
|
||
| # Summary of results | ||
| status = "OK" | ||
| if score < 50: | ||
| status = "CRITICAL" | ||
| elif score < 90: | ||
| status = "WARNING" | ||
|
|
||
| details = ", ".join(issues) if issues else "Optimal" | ||
|
|
||
| return CheckResult( | ||
| name="System Load", | ||
| category="performance", | ||
| score=max(0, score), | ||
| status=status, | ||
| details=details, | ||
| recommendation=rec, | ||
| weight=0.20 # 20% | ||
| ) | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import subprocess | ||
| import os | ||
| from ..monitor import HealthCheck, CheckResult | ||
|
|
||
| class SecurityCheck(HealthCheck): | ||
| def run(self) -> CheckResult: | ||
|
Check failure on line 6 in cortex/health/checks/security.py
|
||
| score = 100 | ||
| issues = [] | ||
| recommendations = [] | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # 1. Firewall (UFW) Check | ||
| ufw_active = False | ||
| try: | ||
| res = subprocess.run( | ||
| ["systemctl", "is-active", "ufw"], | ||
| capture_output=True, text=True | ||
| ) | ||
| # Fix: Use exact match to avoid matching "inactive" which contains "active" | ||
| if res.returncode == 0 and res.stdout.strip() == "active": | ||
| ufw_active = True | ||
| except FileNotFoundError: | ||
| pass # Environment without systemctl (e.g., Docker or non-systemd) | ||
|
|
||
| if not ufw_active: | ||
| score = 0 # Spec: 0 points if Firewall is inactive | ||
| issues.append("Firewall Inactive") | ||
| recommendations.append("Enable UFW Firewall") | ||
|
|
||
| # 2. SSH Root Login Check | ||
| try: | ||
| ssh_config = "/etc/ssh/sshd_config" | ||
| if os.path.exists(ssh_config): | ||
| with open(ssh_config, 'r') as f: | ||
| for line in f: | ||
| line = line.strip() | ||
| # Check for uncommented PermitRootLogin yes | ||
| if line.startswith("PermitRootLogin") and "yes" in line.split(): | ||
| score -= 50 | ||
| issues.append("Root SSH Allowed") | ||
| recommendations.append("Disable SSH Root Login in sshd_config") | ||
| break | ||
| except PermissionError: | ||
| pass # Cannot read config, skip check | ||
|
|
||
| status = "OK" | ||
| if score < 50: status = "CRITICAL" | ||
| elif score < 100: status = "WARNING" | ||
|
|
||
| return CheckResult( | ||
| name="Security Posture", | ||
| category="security", | ||
| score=max(0, score), | ||
| status=status, | ||
| details=", ".join(issues) if issues else "Secure", | ||
| recommendation=", ".join(recommendations) if recommendations else None, | ||
| weight=0.35 | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| import subprocess | ||
| from ..monitor import HealthCheck, CheckResult | ||
|
|
||
| class UpdateCheck(HealthCheck): | ||
| def run(self) -> CheckResult: | ||
| score = 100 | ||
| pkg_count = 0 | ||
| sec_count = 0 | ||
| rec = None | ||
|
|
||
| # Parse apt list --upgradable | ||
| try: | ||
| # Execute safely without pipeline | ||
| res = subprocess.run( | ||
| ["apt", "list", "--upgradable"], | ||
| capture_output=True, text=True | ||
| ) | ||
|
|
||
| lines = res.stdout.splitlines() | ||
| # Skip first line "Listing..." | ||
| for line in lines[1:]: | ||
| if line.strip(): | ||
| pkg_count += 1 | ||
| if "security" in line.lower(): | ||
| sec_count += 1 | ||
|
|
||
| # Scoring | ||
| score -= (pkg_count * 2) # -2 pts per normal package | ||
| score -= (sec_count * 10) # -10 pts per security package | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if pkg_count > 0: | ||
| rec = f"Install {pkg_count} updates (+{100-score} pts)" | ||
|
|
||
| except FileNotFoundError: | ||
| # Skip on non-apt environments (100 pts) | ||
| return CheckResult("Updates", "updates", 100, "SKIP", "apt not found", weight=0.30) | ||
| except Exception: | ||
| pass # Ignore errors | ||
|
|
||
| status = "OK" | ||
| if score < 60: status = "CRITICAL" | ||
| elif score < 100: status = "WARNING" | ||
|
|
||
| details = f"{pkg_count} pending" | ||
| if sec_count > 0: | ||
| details += f" ({sec_count} security)" | ||
|
|
||
| return CheckResult( | ||
| name="System Updates", | ||
| category="updates", | ||
| score=max(0, score), | ||
| status=status, | ||
| details=details, | ||
| recommendation=rec, | ||
| weight=0.30 # 30% | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix AttributeError — calling non-existent public method.
Lines 135 and 154 call
mgr.save_config(), but based on the relevant code snippets,NotificationManageronly provides the private_save_config()method, not a publicsave_config()method. This will cause anAttributeErrorat runtime.Additionally, line 129 correctly uses
mgr._save_config(), creating an inconsistency.Apply this diff to fix the issue:
elif args.notify_action == "disable": mgr.config["enabled"] = False - mgr.save_config() + mgr._save_config() self._print_success("Notifications disabled (Critical alerts will still show)")mgr.config["dnd_start"] = args.start mgr.config["dnd_end"] = args.end - mgr.save_config() + mgr._save_config() self._print_success(f"DND Window updated: {args.start} - {args.end}")Note: If a public
save_config()method is intended, it should be added toNotificationManagerfirst, then all three calls (lines 129, 135, 154) should be updated consistently.Also applies to: 154-154
🤖 Prompt for AI Agents