Skip to content

Commit ef7f8cc

Browse files
committed
feat : add Parallel Task Execution for Multi-Step Installs
Fixes #269
1 parent f18bc09 commit ef7f8cc

File tree

4 files changed

+704
-57
lines changed

4 files changed

+704
-57
lines changed

cortex/cli.py

Lines changed: 128 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import argparse
44
import time
55
import logging
6+
import shutil
7+
import traceback
8+
import urllib.request
69
from typing import List, Optional
7-
import subprocess
810
from datetime import datetime
911

1012
# Suppress noisy log messages in normal operation
@@ -35,9 +37,7 @@
3537
)
3638
from cortex.validators import (
3739
validate_api_key,
38-
validate_install_request,
39-
validate_installation_id,
40-
ValidationError
40+
validate_install_request
4141
)
4242

4343

@@ -62,7 +62,8 @@ def _get_api_key(self) -> Optional[str]:
6262

6363
is_valid, detected_provider, error = validate_api_key()
6464
if not is_valid:
65-
self._print_error(error)
65+
if error:
66+
self._print_error(error)
6667
cx_print("Run [bold]cortex wizard[/bold] to configure your API key.", "info")
6768
cx_print("Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", "info")
6869
return None
@@ -111,11 +112,12 @@ def _clear_line(self):
111112
sys.stdout.write('\r\033[K')
112113
sys.stdout.flush()
113114

114-
def install(self, software: str, execute: bool = False, dry_run: bool = False):
115+
def install(self, software: str, execute: bool = False, dry_run: bool = False, parallel: bool = False):
115116
# Validate input first
116117
is_valid, error = validate_install_request(software)
117118
if not is_valid:
118-
self._print_error(error)
119+
if error:
120+
self._print_error(error)
119121
return 1
120122

121123
api_key = self._get_api_key()
@@ -183,47 +185,119 @@ def progress_callback(current, total, step):
183185

184186
print("\nExecuting commands...")
185187

186-
coordinator = InstallationCoordinator(
187-
commands=commands,
188-
descriptions=[f"Step {i+1}" for i in range(len(commands))],
189-
timeout=300,
190-
stop_on_error=True,
191-
progress_callback=progress_callback
192-
)
193-
194-
result = coordinator.execute()
195-
196-
if result.success:
197-
self._print_success(f"{software} installed successfully!")
198-
print(f"\nCompleted in {result.total_duration:.2f} seconds")
188+
if parallel:
189+
# Use parallel execution
190+
import asyncio
191+
from cortex.install_parallel import run_parallel_install
199192

200-
# Record successful installation
201-
if install_id:
202-
history.update_installation(install_id, InstallationStatus.SUCCESS)
203-
print(f"\n📝 Installation recorded (ID: {install_id})")
204-
print(f" To rollback: cortex rollback {install_id}")
193+
def parallel_log_callback(message: str, level: str = "info"):
194+
"""Callback for parallel execution logging."""
195+
if level == "success":
196+
cx_print(f" ✅ {message}", "success")
197+
elif level == "error":
198+
cx_print(f" ❌ {message}", "error")
199+
else:
200+
cx_print(f" ℹ {message}", "info")
205201

206-
return 0
207-
else:
208-
# Record failed installation
209-
if install_id:
210-
error_msg = result.error_message or "Installation failed"
211-
history.update_installation(
212-
install_id,
213-
InstallationStatus.FAILED,
214-
error_msg
202+
try:
203+
success, parallel_tasks = asyncio.run(
204+
run_parallel_install(
205+
commands=commands,
206+
descriptions=[f"Step {i+1}" for i in range(len(commands))],
207+
timeout=300,
208+
stop_on_error=True,
209+
log_callback=parallel_log_callback
210+
)
215211
)
212+
213+
# Calculate total duration from tasks
214+
total_duration = 0
215+
if parallel_tasks:
216+
max_end = max((t.end_time or 0) for t in parallel_tasks)
217+
min_start = min((t.start_time or time.time()) for t in parallel_tasks)
218+
if max_end and min_start:
219+
total_duration = max_end - min_start
220+
221+
if success:
222+
self._print_success(f"{software} installed successfully!")
223+
print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)")
224+
225+
# Record successful installation
226+
if install_id:
227+
history.update_installation(install_id, InstallationStatus.SUCCESS)
228+
print(f"\n📝 Installation recorded (ID: {install_id})")
229+
print(f" To rollback: cortex rollback {install_id}")
230+
231+
return 0
232+
else:
233+
# Find failed task for error reporting
234+
failed_tasks = [t for t in parallel_tasks if t.status.value == "failed"]
235+
error_msg = failed_tasks[0].error if failed_tasks else "Installation failed"
236+
237+
if install_id:
238+
history.update_installation(
239+
install_id,
240+
InstallationStatus.FAILED,
241+
error_msg
242+
)
243+
244+
self._print_error("Installation failed")
245+
if error_msg:
246+
print(f" Error: {error_msg}", file=sys.stderr)
247+
if install_id:
248+
print(f"\n📝 Installation recorded (ID: {install_id})")
249+
print(f" View details: cortex history show {install_id}")
250+
return 1
216251

217-
if result.failed_step is not None:
218-
self._print_error(f"Installation failed at step {result.failed_step + 1}")
252+
except Exception as e:
253+
if install_id:
254+
history.update_installation(install_id, InstallationStatus.FAILED, str(e))
255+
self._print_error(f"Parallel execution failed: {str(e)}")
256+
return 1
257+
258+
else:
259+
# Use sequential execution (original behavior)
260+
coordinator = InstallationCoordinator(
261+
commands=commands,
262+
descriptions=[f"Step {i+1}" for i in range(len(commands))],
263+
timeout=300,
264+
stop_on_error=True,
265+
progress_callback=progress_callback
266+
)
267+
268+
result = coordinator.execute()
269+
270+
if result.success:
271+
self._print_success(f"{software} installed successfully!")
272+
print(f"\nCompleted in {result.total_duration:.2f} seconds")
273+
274+
# Record successful installation
275+
if install_id:
276+
history.update_installation(install_id, InstallationStatus.SUCCESS)
277+
print(f"\n📝 Installation recorded (ID: {install_id})")
278+
print(f" To rollback: cortex rollback {install_id}")
279+
280+
return 0
219281
else:
220-
self._print_error("Installation failed")
221-
if result.error_message:
222-
print(f" Error: {result.error_message}", file=sys.stderr)
223-
if install_id:
224-
print(f"\n📝 Installation recorded (ID: {install_id})")
225-
print(f" View details: cortex history show {install_id}")
226-
return 1
282+
# Record failed installation
283+
if install_id:
284+
error_msg = result.error_message or "Installation failed"
285+
history.update_installation(
286+
install_id,
287+
InstallationStatus.FAILED,
288+
error_msg
289+
)
290+
291+
if result.failed_step is not None:
292+
self._print_error(f"Installation failed at step {result.failed_step + 1}")
293+
else:
294+
self._print_error("Installation failed")
295+
if result.error_message:
296+
print(f" Error: {result.error_message}", file=sys.stderr)
297+
if install_id:
298+
print(f"\n📝 Installation recorded (ID: {install_id})")
299+
print(f" View details: cortex history show {install_id}")
300+
return 1
227301
else:
228302
print("\nTo execute these commands, run with --execute flag")
229303
print("Example: cortex install docker --execute")
@@ -540,14 +614,11 @@ def edit_pref(self, action: str, key: Optional[str] = None, value: Optional[str]
540614
return 1
541615
except Exception as e:
542616
self._print_error(f"Failed to edit preferences: {str(e)}")
543-
import traceback
544617
traceback.print_exc()
545618
return 1
546619

547620
def status(self):
548621
"""Show system status including security features"""
549-
import shutil
550-
551622
show_banner(show_version=True)
552623
console.print()
553624

@@ -577,7 +648,6 @@ def status(self):
577648
# Check Ollama
578649
ollama_host = os.environ.get('OLLAMA_HOST', 'http://localhost:11434')
579650
try:
580-
import urllib.request
581651
req = urllib.request.Request(f"{ollama_host}/api/tags", method='GET')
582652
with urllib.request.urlopen(req, timeout=2) as resp:
583653
cx_print(f"Ollama: [bold]Running[/bold] ({ollama_host})", "success")
@@ -819,19 +889,20 @@ def main():
819889
subparsers = parser.add_subparsers(dest='command', help='Available commands')
820890

821891
# Demo command (first - show this to new users)
822-
demo_parser = subparsers.add_parser('demo', help='See Cortex in action (no API key needed)')
892+
_demo_parser = subparsers.add_parser('demo', help='See Cortex in action (no API key needed)')
823893

824894
# Wizard command
825-
wizard_parser = subparsers.add_parser('wizard', help='Configure API key interactively')
895+
_wizard_parser = subparsers.add_parser('wizard', help='Configure API key interactively')
826896

827897
# Status command
828-
status_parser = subparsers.add_parser('status', help='Show system status and security features')
898+
_status_parser = subparsers.add_parser('status', help='Show system status and security features')
829899

830900
# Install command
831901
install_parser = subparsers.add_parser('install', help='Install software using natural language')
832902
install_parser.add_argument('software', type=str, help='Software to install (natural language)')
833903
install_parser.add_argument('--execute', action='store_true', help='Execute the generated commands')
834904
install_parser.add_argument('--dry-run', action='store_true', help='Show commands without executing')
905+
install_parser.add_argument('--parallel', action='store_true', help='Execute independent tasks in parallel')
835906

836907
# History command
837908
history_parser = subparsers.add_parser('history', help='View installation history')
@@ -846,17 +917,17 @@ def main():
846917
rollback_parser.add_argument('--dry-run', action='store_true', help='Show rollback actions without executing')
847918

848919
# Check preferences command
849-
check_pref_parser = subparsers.add_parser('check-pref', help='Check/display user preferences')
850-
check_pref_parser.add_argument('key', nargs='?', help='Specific preference key to check (optional)')
920+
_check_pref_parser = subparsers.add_parser('check-pref', help='Check/display user preferences')
921+
_check_pref_parser.add_argument('key', nargs='?', help='Specific preference key to check (optional)')
851922

852923
# Edit preferences command
853-
edit_pref_parser = subparsers.add_parser('edit-pref', help='Edit user preferences')
854-
edit_pref_parser.add_argument('action',
924+
_edit_pref_parser = subparsers.add_parser('edit-pref', help='Edit user preferences')
925+
_edit_pref_parser.add_argument('action',
855926
choices=['set', 'add', 'update', 'delete', 'remove', 'reset-key',
856927
'list', 'show', 'display', 'reset-all', 'validate', 'export', 'import'],
857928
help='Action to perform')
858-
edit_pref_parser.add_argument('key', nargs='?', help='Preference key or filepath (for export/import)')
859-
edit_pref_parser.add_argument('value', nargs='?', help='Preference value (for set/add/update)')
929+
_edit_pref_parser.add_argument('key', nargs='?', help='Preference key or filepath (for export/import)')
930+
_edit_pref_parser.add_argument('value', nargs='?', help='Preference value (for set/add/update)')
860931

861932
args = parser.parse_args()
862933

@@ -874,7 +945,7 @@ def main():
874945
elif args.command == 'status':
875946
return cli.status()
876947
elif args.command == 'install':
877-
return cli.install(args.software, execute=args.execute, dry_run=args.dry_run)
948+
return cli.install(args.software, execute=args.execute, dry_run=args.dry_run, parallel=args.parallel)
878949
elif args.command == 'history':
879950
return cli.history(limit=args.limit, status=args.status, show_id=args.show_id)
880951
elif args.command == 'rollback':

0 commit comments

Comments
 (0)