diff --git a/docs/connecting-local-nodes.md b/docs/connecting-local-nodes.md index 89bade682..f1744f4ad 100644 --- a/docs/connecting-local-nodes.md +++ b/docs/connecting-local-nodes.md @@ -4,6 +4,9 @@ [Telepresence](https://github.com/telepresenceio/telepresence) can be used to make a connection from the cluster to your local machine. Telepresence is designed to intercept cluster commmunication and forward it to your local machine so we will have to install a dummy pod and service to receive the traffic that will get forwarded. +> [!NOTE] +> This guide uses `kubectl` which is available from the [Kubernetes website](https://kubernetes.io/docs/tasks/tools/). + ### Run Warnet network ```shell diff --git a/docs/install.md b/docs/install.md index c7ef14c69..61dc24e98 100644 --- a/docs/install.md +++ b/docs/install.md @@ -1,12 +1,12 @@ # Installing Warnet -Warnet requires Kubernetes (k8s) and helm in order to run the network. Kubernetes can be run remotely or locally (with minikube or Docker Desktop). `kubectl` and `helm` must be run locally to administer the network. +Warnet requires Kubernetes (k8s) and Helm in order to run the network. Kubernetes can be run remotely or locally (with minikube or Docker Desktop). `helm` must be run locally to administer the network. ## Dependencies ### Remote (cloud) cluster -The only two dependencies of Warnet are `helm` and `kubectl` configured to talk to your cloud cluster. +The only dependency of Warnet is `helm`. ### Running Warnet Locally @@ -30,15 +30,14 @@ minikube start Minikube has a [guide](https://kubernetes.io/docs/tutorials/hello-minikube/) on getting started which could be useful to validate that your minikube is running correctly. -### Testing kubectl and helm +### Testing helm -The following commands should run on both local and remote clusters. Do not proceed unless kubectl and helm are working. +The following commands should run on both local and remote clusters. Do not proceed unless helm is working. ```shell helm repo add examples https://helm.github.io/examples helm install hello examples/hello-world helm list -kubectl get pods helm uninstall hello ``` diff --git a/resources/scenarios/test_scenarios/connect_dag.py b/resources/scenarios/test_scenarios/connect_dag.py index 5747291cb..7c9910d61 100644 --- a/resources/scenarios/test_scenarios/connect_dag.py +++ b/resources/scenarios/test_scenarios/connect_dag.py @@ -93,9 +93,6 @@ def run_test(self): self.assert_connection(eight_peers, 9, ConnectionType.DNS) self.assert_connection(nine_peers, 8, ConnectionType.IP) - # TODO: This needs to cause the test to fail - # assert False - self.log.info( f"Successfully ran the connect_dag.py scenario using a temporary file: " f"{os.path.basename(__file__)} " diff --git a/resources/scripts/setup_minikube.sh b/resources/scripts/setup_minikube.sh index f835eed7c..eb364127e 100755 --- a/resources/scripts/setup_minikube.sh +++ b/resources/scripts/setup_minikube.sh @@ -72,15 +72,6 @@ else ERROR_CODE=127 fi -kubectl_path=$(command -v kubectl || true) -if [ -n "$kubectl_path" ]; then - print_partial_message " ⭐️ Found " "kubectl" ": $kubectl_path " "$BOLD" -else - print_partial_message " 💥 Could not find " "kubectl" ". Please follow this link to install it..." "$BOLD" - print_message "" " https://kubernetes.io/docs/tasks/tools/" "$BOLD" - ERROR_CODE=127 -fi - helm_path=$(command -v helm || true) if [ -n "$helm_path" ]; then print_partial_message " ⭐️ Found " "helm" ": $helm_path" "$BOLD" diff --git a/src/warnet/bitcoin.py b/src/warnet/bitcoin.py index a27da3bc7..08de7423d 100644 --- a/src/warnet/bitcoin.py +++ b/src/warnet/bitcoin.py @@ -1,3 +1,5 @@ +import base64 +import codecs import os import re import sys @@ -5,12 +7,21 @@ from io import BytesIO import click -from urllib3.exceptions import MaxRetryError - +from kubernetes.stream import stream from test_framework.messages import ser_uint256 from test_framework.p2p import MESSAGEMAP +from urllib3.exceptions import MaxRetryError -from .k8s import get_default_namespace, get_mission +from .constants import BITCOINCORE_CONTAINER +from .k8s import ( + get_default_namespace, + get_mission, + get_pod, + get_service, + get_static_client, + kexec, + pod_log, +) from .process import run_command @@ -23,7 +34,7 @@ def bitcoin(): @click.argument("tank", type=str) @click.argument("method", type=str) @click.argument("params", type=str, nargs=-1) # this will capture all remaining arguments -def rpc(tank: str, method: str, params: str): +def rpc(tank: str, method: str, params: tuple[str, ...]): """ Call bitcoin-cli [params] on """ @@ -35,15 +46,40 @@ def rpc(tank: str, method: str, params: str): print(result) -def _rpc(tank: str, method: str, params: str): +def _rpc(tank: str, method: str, params: tuple[str, ...]) -> str: # bitcoin-cli should be able to read bitcoin.conf inside the container # so no extra args like port, chain, username or password are needed namespace = get_default_namespace() + + sclient = get_static_client() if params: - cmd = f"kubectl -n {namespace} exec {tank} -- bitcoin-cli {method} {' '.join(map(str, params))}" + cmd = ["bitcoin-cli", method] + cmd.extend(params) else: - cmd = f"kubectl -n {namespace} exec {tank} -- bitcoin-cli {method}" - return run_command(cmd) + cmd = ["bitcoin-cli", method] + resp = stream( + sclient.connect_get_namespaced_pod_exec, + tank, + namespace, + container=BITCOINCORE_CONTAINER, + command=cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ) + stdout = "" + stderr = "" + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + stdout_chunk = resp.read_stdout() + stdout += stdout_chunk + if resp.peek_stderr(): + stderr_chunk = resp.read_stderr() + stderr += stderr_chunk + return stdout + stderr @bitcoin.command() @@ -52,7 +88,7 @@ def debug_log(tank: str): """ Fetch the Bitcoin Core debug log from """ - cmd = f"kubectl logs {tank}" + cmd = f"warnet logs {tank}" try: print(run_command(cmd)) except Exception as e: @@ -67,14 +103,13 @@ def grep_logs(pattern: str, show_k8s_timestamps: bool, no_sort: bool): """ Grep combined bitcoind logs using regex """ - try: tanks = get_mission("tank") except MaxRetryError as e: print(f"{e}") sys.exit(1) - matching_logs = [] + matching_logs: list[tuple[str, any]] = [] for tank in tanks: pod_name = tank.metadata.name @@ -89,14 +124,14 @@ def grep_logs(pattern: str, show_k8s_timestamps: bool, no_sort: bool): continue # Get logs from the specific container - command = f"kubectl logs {pod_name} -c {container_name} --timestamps" - logs = run_command(command) + log_stream = pod_log(pod_name, container_name, timestamps=True) + + compiled_pattern = re.compile(pattern) - if logs is not False: - # Process logs - for log_entry in logs.splitlines(): - if re.search(pattern, log_entry): - matching_logs.append((log_entry, pod_name)) + for log_line in iter_lines_from_stream(log_stream): + log_entry = log_line.rstrip("\n") + if compiled_pattern.search(log_entry): + matching_logs.append((log_entry, pod_name)) # Sort logs if needed if not no_sort: @@ -121,6 +156,22 @@ def grep_logs(pattern: str, show_k8s_timestamps: bool, no_sort: bool): return matching_logs +def iter_lines_from_stream(log_stream, encoding="utf-8"): + decoder = codecs.getincrementaldecoder(encoding)() + buffer = "" + for chunk in log_stream.stream(): + # Decode the chunk incrementally + text = decoder.decode(chunk) + buffer += text + # Split the buffer into lines + lines = buffer.split("\n") + buffer = lines.pop() # Last item is incomplete line or empty + yield from lines + # Yield any remaining text in the buffer + if buffer: + yield buffer + + @bitcoin.command() @click.argument("tank_a", type=str, required=True) @click.argument("tank_b", type=str, required=True) @@ -167,17 +218,18 @@ def get_messages(tank_a: str, tank_b: str, chain: str): base_dir = f"/root/.bitcoin/{subdir}message_capture" # Get the IP of node_b - cmd = f"kubectl get pod {tank_b} -o jsonpath='{{.status.podIP}}'" - tank_b_ip = run_command(cmd).strip() + tank_b_pod = get_pod(tank_b) + tank_b_ip = tank_b_pod.status.pod_ip # Get the service IP of node_b - cmd = f"kubectl get service {tank_b} -o jsonpath='{{.spec.clusterIP}}'" - tank_b_service_ip = run_command(cmd).strip() + tank_b_service = get_service(tank_b) + tank_b_service_ip = tank_b_service.spec.cluster_ip # List directories in the message capture folder - cmd = f"kubectl exec {tank_a} -- ls {base_dir}" - dirs = run_command(cmd).splitlines() + resp = kexec(tank_a, get_default_namespace(), ["ls", base_dir]) + + dirs = resp.splitlines() messages = [] @@ -186,18 +238,15 @@ def get_messages(tank_a: str, tank_b: str, chain: str): for file, outbound in [["msgs_recv.dat", False], ["msgs_sent.dat", True]]: file_path = f"{base_dir}/{dir_name}/{file}" # Fetch the file contents from the container - cmd = f"kubectl exec {tank_a} -- cat {file_path}" - import subprocess - - blob = subprocess.run( - cmd, shell=True, capture_output=True, executable="bash" - ).stdout + resp = kexec(tank_a, get_default_namespace(), ["base64", file_path]) + resp_bytes = base64.b64decode(resp) # Parse the blob - json = parse_raw_messages(blob, outbound) + json = parse_raw_messages(resp_bytes, outbound) messages = messages + json messages.sort(key=lambda x: x["time"]) + return messages diff --git a/src/warnet/constants.py b/src/warnet/constants.py index 99bdf2c5c..a3ffe29a1 100644 --- a/src/warnet/constants.py +++ b/src/warnet/constants.py @@ -11,11 +11,14 @@ tag for index, tag in enumerate(reversed(SUPPORTED_TAGS)) for _ in range(index + 1) ] -DEFAULT_NAMESPACE = "warnet" +DEFAULT_NAMESPACE = "default" LOGGING_NAMESPACE = "warnet-logging" INGRESS_NAMESPACE = "ingress" HELM_COMMAND = "helm upgrade --install --create-namespace" +BITCOINCORE_CONTAINER = "bitcoincore" +COMMANDER_CONTAINER = "commander" + # Directories and files for non-python assets, e.g., helm charts, example scenarios, default configs SRC_DIR = files("warnet") RESOURCES_DIR = files("resources") diff --git a/src/warnet/control.py b/src/warnet/control.py index cea7bc9a0..a32c198e0 100644 --- a/src/warnet/control.py +++ b/src/warnet/control.py @@ -16,11 +16,17 @@ from rich.prompt import Confirm, Prompt from rich.table import Table -from .constants import COMMANDER_CHART, LOGGING_NAMESPACE +from .constants import ( + BITCOINCORE_CONTAINER, + COMMANDER_CHART, + COMMANDER_CONTAINER, + LOGGING_NAMESPACE, +) from .k8s import ( delete_pod, get_default_namespace, get_mission, + get_pod, get_pods, pod_log, snapshot_bitcoin_datadir, @@ -28,7 +34,7 @@ wait_for_pod, write_file_to_container, ) -from .process import run_command, stream_command +from .process import run_command console = Console() @@ -83,11 +89,8 @@ def stop_scenario(scenario_name): """Stop a single scenario using Helm""" # Stop the pod immediately (faster than uninstalling) namespace = get_default_namespace() - cmd = f"kubectl --namespace {namespace} delete pod {scenario_name} --grace-period=0 --force" - if stream_command(cmd): - console.print(f"[bold green]Successfully stopped scenario: {scenario_name}[/bold green]") - else: - console.print(f"[bold red]Failed to stop scenario: {scenario_name}[/bold red]") + delete_pod(scenario_name, namespace, grace_period=0, force=True) + console.print(f"[bold yellow]Requested scenario stop: {scenario_name}[/bold yellow]") # Then uninstall via helm (non-blocking) command = f"helm uninstall {scenario_name} --namespace {namespace} --wait=false" @@ -119,11 +122,6 @@ def uninstall_release(namespace, release_name): subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return f"Initiated uninstall for: {release_name} in namespace {namespace}" - def delete_pod(pod_name, namespace): - cmd = f"kubectl delete pod --ignore-not-found=true {pod_name} -n {namespace} --grace-period=0 --force" - subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return f"Initiated deletion of pod: {pod_name} in namespace {namespace}" - with ThreadPoolExecutor(max_workers=10) as executor: futures = [] @@ -139,11 +137,24 @@ def delete_pod(pod_name, namespace): # Delete remaining pods pods = get_pods() for pod in pods.items: - futures.append(executor.submit(delete_pod, pod.metadata.name, pod.metadata.namespace)) + futures.append( + executor.submit( + delete_pod, + pod.metadata.name, + pod.metadata.namespace, + grace_period=0, + force=True, + ignore_not_found=True, + ) + ) # Wait for all tasks to complete and print results for future in as_completed(futures): - console.print(f"[yellow]{future.result()}[/yellow]") + result = future.result() + msg = "" + if result: + msg = result if isinstance(result, str) else result.metadata.name + click.secho(f"Deletion: {msg}", fg="yellow") console.print("[bold yellow]Teardown process initiated for all components.[/bold yellow]") console.print("[bold yellow]Note: Some processes may continue in the background.[/bold yellow]") @@ -289,7 +300,7 @@ def filter(path): wait_for_pod(name) _logs(pod_name=name, follow=True) print("Deleting pod...") - delete_pod(name) + delete_pod(name, namespace=namespace) @click.command() @@ -301,18 +312,16 @@ def logs(pod_name: str, follow: bool): def _logs(pod_name: str, follow: bool): - namespace = get_default_namespace() - if pod_name == "": try: pods = get_pods() pod_list = [item.metadata.name for item in pods.items] except Exception as e: - print(f"Could not fetch any pods in namespace {namespace}: {e}") + print(f"Could not fetch any pods: {e}") return if not pod_list: - print(f"Could not fetch any pods in namespace {namespace}") + print("Could not fetch any pods") return q = [ @@ -329,9 +338,28 @@ def _logs(pod_name: str, follow: bool): return # cancelled by user try: - stream = pod_log(pod_name, container_name=None, follow=follow) + pod = get_pod(pod_name) + eligible_container_names = [BITCOINCORE_CONTAINER, COMMANDER_CONTAINER] + available_container_names = [container.name for container in pod.spec.containers] + container_name = next( + ( + container_name + for container_name in available_container_names + if container_name in eligible_container_names + ), + None, + ) + if not container_name: + print("Could not determine primary container.") + return + except Exception as e: + print(f"Error getting pods. Could not determine primary container: {e}") + return + + try: + stream = pod_log(pod_name, container_name=container_name, follow=follow) for line in stream.stream(): - print(line.decode("utf-8"), end=None) + click.secho(line.decode("utf-8")) except Exception as e: print(e) except KeyboardInterrupt: diff --git a/src/warnet/k8s.py b/src/warnet/k8s.py index 9c18d095d..669ef1c51 100644 --- a/src/warnet/k8s.py +++ b/src/warnet/k8s.py @@ -1,16 +1,19 @@ import json import os -import sys import tempfile from pathlib import Path from time import sleep +from typing import Optional import yaml from kubernetes import client, config, watch -from kubernetes.client.models import CoreV1Event, V1PodList +from kubernetes.client.api import CoreV1Api +from kubernetes.client.models import V1DeleteOptions, V1Pod, V1PodList, V1Service, V1Status from kubernetes.client.rest import ApiException from kubernetes.dynamic import DynamicClient from kubernetes.stream import stream +from kubernetes.utils import create_from_yaml +from urllib3 import HTTPResponse from .constants import ( CADDY_INGRESS_NAME, @@ -19,10 +22,13 @@ KUBECONFIG, LOGGING_NAMESPACE, ) -from .process import run_command, stream_command -def get_static_client() -> CoreV1Event: +class K8sError(Exception): + pass + + +def get_static_client() -> CoreV1Api: config.load_kube_config(config_file=KUBECONFIG) return client.CoreV1Api() @@ -32,6 +38,36 @@ def get_dynamic_client() -> DynamicClient: return DynamicClient(client.ApiClient()) +def kexec(pod: str, namespace: str, cmd: [str]) -> str: + """It's `kubectl exec` but with a k at the beginning so as not to conflict with python's `exec`""" + sclient = get_static_client() + resp = stream( + sclient.connect_get_namespaced_pod_exec, + pod, + namespace, + command=cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + return resp + + +def get_service(name: str, namespace: Optional[str] = None) -> V1Service: + sclient = get_static_client() + if not namespace: + namespace = get_default_namespace() + return sclient.read_namespaced_service(name=name, namespace=namespace) + + +def get_pod(name: str, namespace: Optional[str] = None) -> V1Pod: + sclient = get_static_client() + if not namespace: + namespace = get_default_namespace() + return sclient.read_namespaced_pod(name=name, namespace=namespace) + + def get_pods() -> V1PodList: sclient = get_static_client() try: @@ -41,7 +77,7 @@ def get_pods() -> V1PodList: return pod_list -def get_mission(mission: str) -> list[V1PodList]: +def get_mission(mission: str) -> list[V1Pod]: pods = get_pods() crew = [] for pod in pods.items: @@ -52,8 +88,7 @@ def get_mission(mission: str) -> list[V1PodList]: def get_pod_exit_status(pod_name): try: - sclient = get_static_client() - pod = sclient.read_namespaced_pod(name=pod_name, namespace=get_default_namespace()) + pod = get_pod(pod_name) for container_status in pod.status.container_statuses: if container_status.state.terminated: return container_status.state.terminated.exit_code @@ -64,13 +99,14 @@ def get_pod_exit_status(pod_name): def get_edges() -> any: + namespace = get_default_namespace() sclient = get_static_client() - configmap = sclient.read_namespaced_config_map(name="edges", namespace="warnet") + configmap = sclient.read_namespaced_config_map(name="edges", namespace=namespace) return json.loads(configmap.data["data"]) def create_kubernetes_object( - kind: str, metadata: dict[str, any], spec: dict[str, any] = None + kind: str, metadata: dict[str, any], spec: Optional[dict[str, any]] = None ) -> dict[str, any]: metadata["namespace"] = get_default_namespace() obj = { @@ -83,22 +119,73 @@ def create_kubernetes_object( return obj -def set_kubectl_context(namespace: str) -> bool: +def get_context_entry(kubeconfig_data: dict) -> dict: + current_context_name = kubeconfig_data.get("current-context") + if not current_context_name: + raise K8sError(f"Could not determine current context from config data: {kubeconfig_data}") + + context_entry = next( + ( + ctx + for ctx in kubeconfig_data.get("contexts", []) + if ctx.get("name") == current_context_name + ), + None, + ) + + if not context_entry: + raise K8sError(f"Context '{current_context_name}' not found in kubeconfig.") + + return context_entry + + +def set_context_namespace(namespace: str) -> None: """ - Set the default kubectl context to the specified namespace. + Set the namespace within the KUBECONFIG's current context """ - command = f"kubectl config set-context --current --namespace={namespace}" - result = stream_command(command) - if result: - print(f"Kubectl context set to namespace: {namespace}") - else: - print(f"Failed to set kubectl context to namespace: {namespace}") - return result + try: + kubeconfig_data = open_kubeconfig() + except K8sError as e: + raise K8sError(f"Could not open KUBECONFIG: {KUBECONFIG}") from e + + try: + context_entry = get_context_entry(kubeconfig_data) + except K8sError as e: + raise K8sError(f"Could not get context entry for {KUBECONFIG}") from e + + context_entry["context"]["namespace"] = namespace + + try: + write_kubeconfig(kubeconfig_data) + except Exception as e: + raise K8sError(f"Could not write to KUBECONFIG: {KUBECONFIG}") from e + + +def get_default_namespace() -> str: + try: + kubeconfig_data = open_kubeconfig() + except K8sError as e: + raise K8sError(f"Could not open KUBECONFIG: {KUBECONFIG}") from e + + try: + context_entry = get_context_entry(kubeconfig_data) + except K8sError as e: + raise K8sError(f"Could not get context entry for {KUBECONFIG}") from e + + namespace = context_entry["context"].get("namespace", DEFAULT_NAMESPACE) + + return namespace def apply_kubernetes_yaml(yaml_file: str) -> bool: - command = f"kubectl apply -f {yaml_file}" - return stream_command(command) + namespace = get_default_namespace() + v1 = get_static_client() + path = os.path.abspath(yaml_file) + try: + create_from_yaml(v1, path, namespace=namespace) + return True + except Exception as e: + raise e def apply_kubernetes_yaml_obj(yaml_obj: str) -> None: @@ -112,29 +199,33 @@ def apply_kubernetes_yaml_obj(yaml_obj: str) -> None: Path(temp_file_path).unlink() -def delete_namespace(namespace: str) -> bool: - command = f"kubectl delete namespace {namespace} --ignore-not-found" - return run_command(command) - - -def delete_pod(pod_name: str) -> bool: - command = f"kubectl -n {get_default_namespace()} delete pod {pod_name}" - return stream_command(command) - - -def get_default_namespace() -> str: - command = "kubectl config view --minify -o jsonpath='{..namespace}'" +def delete_namespace(namespace: str) -> V1Status: + v1: CoreV1Api = get_static_client() + resp = v1.delete_namespace(namespace) + return resp + + +def delete_pod( + pod_name: str, + namespace: str, + grace_period: int = 30, + force: bool = False, + ignore_not_found: bool = True, +) -> Optional[V1Status]: + v1: CoreV1Api = get_static_client() + delete_options = V1DeleteOptions( + grace_period_seconds=grace_period, + propagation_policy="Foreground" if force else "Background", + ) try: - kubectl_namespace = run_command(command) - except Exception as e: - print(e) - if str(e).find("command not found"): - print( - "It looks like kubectl is not installed. Please install it to continue: " - "https://kubernetes.io/docs/tasks/tools/" - ) - sys.exit(1) - return kubectl_namespace if kubectl_namespace else DEFAULT_NAMESPACE + resp = v1.delete_namespaced_pod(name=pod_name, namespace=namespace, body=delete_options) + return resp + except ApiException as e: + if e.status == 404 and ignore_not_found: + print(f"Pod {pod_name} in namespace {namespace} not found, but ignoring as requested.") + return None + else: + raise def snapshot_bitcoin_datadir( @@ -220,15 +311,13 @@ def snapshot_bitcoin_datadir( if resp.peek_stderr(): print(f"Error: {resp.read_stderr()}") resp.close() + local_file_path = Path(local_path) / f"{pod_name}_bitcoin_data.tar.gz" - copy_command = ( - f"kubectl cp {namespace}/{pod_name}:/tmp/bitcoin_data.tar.gz {local_file_path}" - ) - if not stream_command(copy_command): - raise Exception("Failed to copy tar file from pod to local machine") + temp_bitcoin_data_path = "/tmp/bitcoin_data.tar.gz" + copy_file_from_pod(namespace, pod_name, temp_bitcoin_data_path, local_file_path) print(f"Bitcoin data exported successfully to {local_file_path}") - cleanup_command = ["rm", "/tmp/bitcoin_data.tar.gz"] + cleanup_command = ["rm", temp_bitcoin_data_path] stream( sclient.connect_get_namespaced_pod_exec, pod_name, @@ -247,6 +336,36 @@ def snapshot_bitcoin_datadir( print(f"An error occurred: {str(e)}") +def copy_file_from_pod(namespace, pod_name, pod_path, local_path): + exec_command = ["cat", pod_path] + + v1 = client.CoreV1Api() + + # Note: We do not specify the container name here; if we pack multiple containers in a pod + # we will need to change this + resp = stream( + v1.connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=exec_command, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ) + + with open(local_path, "wb") as local_file: + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + local_file.write(resp.read_stdout().encode("utf-8")) + if resp.peek_stderr(): + print("Error:", resp.read_stderr()) + + resp.close() + + def wait_for_pod_ready(name, namespace, timeout=300): sclient = get_static_client() w = watch.Watch() @@ -273,6 +392,8 @@ def wait_for_init(pod_name, timeout=300): ): pod = event["object"] if pod.metadata.name == pod_name: + if not pod.status.init_container_statuses: + continue for init_container_status in pod.status.init_container_statuses: if init_container_status.state.running: print(f"initContainer in pod {pod_name} is ready") @@ -304,7 +425,7 @@ def get_ingress_ip_or_host(): return None -def pod_log(pod_name, container_name=None, follow=False): +def pod_log(pod_name, container_name=None, follow=False, timestamps=False) -> HTTPResponse: sclient = get_static_client() try: return sclient.read_namespaced_pod_log( @@ -312,6 +433,7 @@ def pod_log(pod_name, container_name=None, follow=False): namespace=get_default_namespace(), container=container_name, follow=follow, + timestamps=timestamps, _preload_content=False, ) except ApiException as e: @@ -351,3 +473,24 @@ def write_file_to_container(pod_name, container_name, dst_path, data): return True except Exception as e: print(f"Failed to copy data to {pod_name}({container_name}):{dst_path}:\n{e}") + + +def open_kubeconfig(kubeconfig_path: str = KUBECONFIG) -> dict: + try: + with open(kubeconfig_path) as file: + return yaml.safe_load(file) + except FileNotFoundError as e: + raise K8sError(f"Kubeconfig file {kubeconfig_path} not found.") from e + except yaml.YAMLError as e: + raise K8sError(f"Error parsing kubeconfig: {e}") from e + + +def write_kubeconfig(kube_config: dict) -> None: + dir_name = os.path.dirname(KUBECONFIG) + try: + with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False) as temp_file: + yaml.safe_dump(kube_config, temp_file) + os.replace(temp_file.name, KUBECONFIG) + except Exception as e: + os.remove(temp_file.name) + raise K8sError(f"Error writing kubeconfig: {KUBECONFIG}") from e diff --git a/src/warnet/namespaces.py b/src/warnet/namespaces.py index 45bcb7af5..46ec26876 100644 --- a/src/warnet/namespaces.py +++ b/src/warnet/namespaces.py @@ -9,7 +9,7 @@ NAMESPACES_DIR, NAMESPACES_FILE, ) -from .process import run_command, stream_command +from .k8s import CoreV1Api, V1Status, get_static_client def copy_namespaces_defaults(directory: Path): @@ -32,17 +32,14 @@ def namespaces(): """Namespaces commands""" -@click.argument( - "namespaces_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path) -) @namespaces.command() def list(): """List all namespaces with 'warnet-' prefix""" - cmd = "kubectl get namespaces -o jsonpath='{.items[*].metadata.name}'" - res = run_command(cmd) - all_namespaces = res.split() - warnet_namespaces = [ns for ns in all_namespaces if ns.startswith("warnet-")] - + sclient: CoreV1Api = get_static_client() + all_namespaces = sclient.list_namespace() + warnet_namespaces = [ + ns.metadata.name for ns in all_namespaces.items if ns.metadata.name.startswith("warnet-") + ] if warnet_namespaces: print("Warnet namespaces:") for ns in warnet_namespaces: @@ -56,33 +53,34 @@ def list(): @click.argument("namespace", required=False) def destroy(destroy_all: bool, namespace: str): """Destroy a specific namespace or all warnet- prefixed namespaces""" + sclient: CoreV1Api = get_static_client() if destroy_all: - cmd = "kubectl get namespaces -o jsonpath='{.items[*].metadata.name}'" - res = run_command(cmd) - - # Get the list of namespaces - all_namespaces = res.split() - warnet_namespaces = [ns for ns in all_namespaces if ns.startswith("warnet-")] + all_namespaces = sclient.list_namespace() + warnet_namespaces = [ + ns.metadata.name + for ns in all_namespaces.items + if ns.metadata.name.startswith("warnet-") + ] if not warnet_namespaces: print("No warnet namespaces found to destroy.") return for ns in warnet_namespaces: - destroy_cmd = f"kubectl delete namespace {ns}" - if not stream_command(destroy_cmd): - print(f"Failed to destroy namespace: {ns}") + resp: V1Status = sclient.delete_namespace(ns) + if resp.status: + print(f"Destroyed namespace: {ns} with {resp.status}") else: - print(f"Destroyed namespace: {ns}") + print(f"Failed to destroy namespace: {ns}") elif namespace: if not namespace.startswith("warnet-"): print("Error: Can only destroy namespaces with 'warnet-' prefix") return - destroy_cmd = f"kubectl delete namespace {namespace}" - if not stream_command(destroy_cmd): - print(f"Failed to destroy namespace: {namespace}") + resp: V1Status = sclient.delete_namespace(namespace) + if resp.status: + print(f"Destroying namespace: {namespace} with {resp.status}") else: - print(f"Destroyed namespace: {namespace}") + print(f"Failed to destroy namespace: {namespace}") else: print("Error: Please specify a namespace or use --all flag.") diff --git a/src/warnet/project.py b/src/warnet/project.py index 05d6237a4..37f3fc168 100644 --- a/src/warnet/project.py +++ b/src/warnet/project.py @@ -244,12 +244,6 @@ def check_installation(tool_info: ToolInfo) -> ToolStatus: install_instruction="Please make sure minikube is running", install_url="https://minikube.sigs.k8s.io/docs/start/", ) - kubectl_info = ToolInfo( - tool_name="Kubectl", - is_installed_func=is_kubectl_installed, - install_instruction="Install kubectl.", - install_url="https://kubernetes.io/docs/tasks/tools/install-kubectl/", - ) helm_info = ToolInfo( tool_name="Helm", is_installed_func=is_helm_installed_and_offer_if_not, @@ -275,7 +269,7 @@ def check_installation(tool_info: ToolInfo) -> ToolStatus: print(" ╰───────────────────────────╯ ") print(" ") print(" Let's find out if your system has what it takes to run Warnet...") - print("") + print(" ") try: questions = [ @@ -304,7 +298,6 @@ def check_installation(tool_info: ToolInfo) -> ToolStatus: if is_platform_darwin(): check_results.append(check_installation(minikube_version_info)) check_results.append(check_installation(minikube_running_info)) - check_results.append(check_installation(kubectl_info)) check_results.append(check_installation(helm_info)) else: click.secho("Please re-run setup.", fg="yellow") diff --git a/src/warnet/users.py b/src/warnet/users.py index c85e53585..b4e05f56a 100644 --- a/src/warnet/users.py +++ b/src/warnet/users.py @@ -1,70 +1,100 @@ import os -import subprocess import sys import click -import yaml + +from warnet.constants import KUBECONFIG +from warnet.k8s import K8sError, open_kubeconfig, write_kubeconfig @click.command() -@click.argument("kube_config", type=str) -def auth(kube_config: str) -> None: - """ - Authenticate with a warnet cluster using a kube config file - """ +@click.argument("auth_config", type=str) +def auth(auth_config): + """Authenticate with a Warnet cluster using a kubernetes config file""" try: - current_kubeconfig = os.environ.get("KUBECONFIG", os.path.expanduser("~/.kube/config")) - combined_kubeconfig = ( - f"{current_kubeconfig}:{kube_config}" if current_kubeconfig else kube_config - ) - os.environ["KUBECONFIG"] = combined_kubeconfig - with open(kube_config) as file: - content = yaml.safe_load(file) - user = content["users"][0] - user_name = user["name"] - user_token = user["user"]["token"] - current_context = content["current-context"] - flatten_cmd = "kubectl config view --flatten" - result_flatten = subprocess.run( - flatten_cmd, shell=True, check=True, capture_output=True, text=True - ) - except subprocess.CalledProcessError as e: - click.secho("Error occurred while executing kubectl config view --flatten:", fg="red") - click.secho(e.stderr, fg="red") + auth_config = open_kubeconfig(auth_config) + except K8sError as e: + click.secho(e, fg="yellow") + click.secho(f"Could not open auth_config: {auth_config}", fg="red") sys.exit(1) - if result_flatten.returncode == 0: - with open(current_kubeconfig, "w") as file: - file.write(result_flatten.stdout) - click.secho(f"Authorization file written to: {current_kubeconfig}", fg="green") - else: - click.secho("Could not create authorization file", fg="red") - click.secho(result_flatten.stderr, fg="red") - sys.exit(result_flatten.returncode) + is_first_config = False + if not os.path.exists(KUBECONFIG): + try: + write_kubeconfig(auth_config) + is_first_config = True + except K8sError as e: + click.secho(e, fg="yellow") + click.secho(f"Could not write KUBECONFIG: {KUBECONFIG}", fg="red") + sys.exit(1) try: - update_cmd = f"kubectl config set-credentials {user_name} --token {user_token}" - result_update = subprocess.run( - update_cmd, shell=True, check=True, capture_output=True, text=True - ) - if result_update.returncode != 0: - click.secho("Could not update authorization file", fg="red") - click.secho(result_flatten.stderr, fg="red") - sys.exit(result_flatten.returncode) - except subprocess.CalledProcessError as e: - click.secho("Error occurred while executing kubectl config view --flatten:", fg="red") - click.secho(e.stderr, fg="red") + base_config = open_kubeconfig(KUBECONFIG) + except K8sError as e: + click.secho(e, fg="yellow") + click.secho(f"Could not open KUBECONFIG: {KUBECONFIG}", fg="red") sys.exit(1) - with open(current_kubeconfig) as file: - contents = yaml.safe_load(file) + if not is_first_config: + clusters = "clusters" + if clusters in auth_config: + merge_entries( + base_config.setdefault(clusters, []), auth_config[clusters], "name", "cluster" + ) + + users = "users" + if users in auth_config: + merge_entries(base_config.setdefault(users, []), auth_config[users], "name", "user") + + contexts = "contexts" + if contexts in auth_config: + merge_entries( + base_config.setdefault(contexts, []), auth_config[contexts], "name", "context" + ) + + new_current_context = auth_config.get("current-context") + base_config["current-context"] = new_current_context + + # Check if the new current context has an explicit namespace + context_entry = next( + (ctx for ctx in base_config["contexts"] if ctx["name"] == new_current_context), None + ) + if context_entry and "namespace" not in context_entry["context"]: + click.secho( + f"Warning: The context '{new_current_context}' does not have an explicit namespace.", + fg="yellow", + ) - with open(current_kubeconfig, "w") as file: - contents["current-context"] = current_context - yaml.safe_dump(contents, file) + try: + write_kubeconfig(base_config) + click.secho(f"Updated kubeconfig with authorization data: {KUBECONFIG}", fg="green") + except K8sError as e: + click.secho(e, fg="yellow") + click.secho(f"Could not write KUBECONFIG: {KUBECONFIG}", fg="red") + sys.exit(1) - with open(current_kubeconfig) as file: - contents = yaml.safe_load(file) + try: + base_config = open_kubeconfig(KUBECONFIG) click.secho( - f"\nwarnet's current context is now set to: {contents['current-context']}", fg="green" + f"Warnet's current context is now set to: {base_config['current-context']}", fg="green" ) + except K8sError as e: + click.secho(f"Error reading from {KUBECONFIG}: {e}", fg="red") + sys.exit(1) + + +def merge_entries(base_list, auth_list, key, entry_type): + base_entry_names = {entry[key] for entry in base_list} # Extract existing names + for entry in auth_list: + if entry[key] in base_entry_names: + if click.confirm( + f"The {entry_type} '{entry[key]}' already exists. Overwrite?", default=False + ): + # Find and replace the existing entry + base_list[:] = [e if e[key] != entry[key] else entry for e in base_list] + click.secho(f"Overwrote {entry_type} '{entry[key]}'", fg="yellow") + else: + click.secho(f"Skipped {entry_type} '{entry[key]}'", fg="yellow") + else: + base_list.append(entry) + click.secho(f"Added new {entry_type} '{entry[key]}'", fg="green") diff --git a/test/scenarios_test.py b/test/scenarios_test.py index 0b8ba7a4a..d66943a5c 100755 --- a/test/scenarios_test.py +++ b/test/scenarios_test.py @@ -62,7 +62,7 @@ def check_blocks(self, target_blocks, start: int = 0): try: deployed = scenarios_deployed() commander = deployed[0]["commander"] - command = f"kubectl logs {commander}" + command = f"warnet logs {commander}" print("\ncommander output:") print(run_command(command)) print("\n") diff --git a/test/test_base.py b/test/test_base.py index 1a2a4c983..dfb916c94 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -3,6 +3,7 @@ import logging.config import os import re +import shlex import threading from pathlib import Path from subprocess import run @@ -66,7 +67,7 @@ def assert_log_msgs(self): def warnet(self, cmd): self.log.debug(f"Executing warnet command: {cmd}") - command = ["warnet"] + cmd.split() + command = ["warnet"] + shlex.split(cmd) # shlex handles escape sequences nicely proc = run(command, capture_output=True) if proc.stderr: raise Exception(proc.stderr.decode().strip())