diff --git a/sebs.py b/sebs.py index 862f7ce08..4204022d1 100755 --- a/sebs.py +++ b/sebs.py @@ -427,7 +427,7 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur """ (config, output_dir, logging_filename, sebs_client, deployment_client) = parse_common_params( - ignore_cache=True, update_code=False, update_storage=False, + update_code=False, update_storage=False, deployment="local", storage_configuration=storage_configuration, **kwargs ) deployment_client = cast(sebs.local.Local, deployment_client) @@ -457,8 +457,6 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur # Otherwise we want to clean up as much as possible deployment_client.shutdown_storage = False - deployment_client.config.serialize() - result.serialize(output) sebs_client.logging.info(f"Save results to {os.path.abspath(output)}") @@ -475,8 +473,16 @@ def stop(input_json, output_json, **kwargs): sebs.utils.global_logging() logging.info(f"Stopping deployment from {os.path.abspath(input_json)}") + (config, output_dir, logging_filename, sebs_client, deployment_client) = parse_common_params( + update_code=False, update_storage=False, + deployment="local", **kwargs + ) + + deployment_client.res + deployment = sebs.local.Deployment.deserialize(input_json, None) deployment.shutdown(output_json) + logging.info(f"Stopped deployment from {os.path.abspath(input_json)}") diff --git a/sebs/cache.py b/sebs/cache.py index 4403e59b2..e21e2f27b 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -59,7 +59,7 @@ def typename() -> str: def load_config(self): with self._lock: - for cloud in ["azure", "aws", "gcp", "openwhisk"]: + for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) if os.path.exists(cloud_config_file): self.cached_config[cloud] = json.load(open(cloud_config_file, "r")) @@ -86,7 +86,7 @@ def unlock(self): def shutdown(self): if self.config_updated: - for cloud in ["azure", "aws", "gcp", "openwhisk"]: + for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: if cloud in self.cached_config: cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) self.logging.info("Update cached config {}".format(cloud_config_file)) diff --git a/sebs/local/config.py b/sebs/local/config.py index a97b16f8b..6f503d0ca 100644 --- a/sebs/local/config.py +++ b/sebs/local/config.py @@ -1,11 +1,9 @@ -import json - -from typing import cast, Optional +from typing import cast, Optional, Set from sebs.cache import Cache from sebs.faas.config import Config, Credentials, Resources from sebs.storage.minio import MinioConfig -from sebs.utils import serialize, LoggingHandlers +from sebs.utils import LoggingHandlers class LocalCredentials(Credentials): @@ -28,41 +26,59 @@ def __init__(self, storage_cfg: Optional[MinioConfig] = None): self._path: str = "" super().__init__(name="local") self._storage = storage_cfg - self._allocated_ports = set() + self._allocated_ports: Set[int] = set() @property def storage_config(self) -> Optional[MinioConfig]: return self._storage - @property - def path(self) -> str: - return self._path - @property def allocated_ports(self) -> set: return self._allocated_ports def serialize(self) -> dict: - out = { - "allocated_ports": list(self._allocated_ports) - } + out: dict = {} + out["allocated_ports"] = list(self._allocated_ports) + if self._storage is not None: + out["storage"] = self._storage.serialize() return out @staticmethod - def initialize(res: Resources, cfg: dict): - pass + def initialize(res: Resources, config: dict): - @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: - ret = LocalResources() - ret._path = config["path"] + resources = cast(LocalResources, res) # Check for new config if "storage" in config: - ret._storage = MinioConfig.deserialize(config["storage"]) - ret.logging.info("Using user-provided configuration of storage for local containers.") + resources._storage = MinioConfig.deserialize(config["storage"]) + resources.logging.info( + "Using user-provided configuration of storage for local containers." + ) if "allocated_ports" in config: - ret._allocated_ports = set(config["allocated_ports"]) + resources._allocated_ports = set(config["allocated_ports"]) + + def update_cache(self, cache: Cache): + super().update_cache(cache) + cache.update_config( + val=list(self._allocated_ports), keys=["local", "resources", "allocated_ports"] + ) + if self._storage is not None: + self._storage.update_cache(["local", "resources", "storage"], cache) + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: + ret = LocalResources() + + cached_config = cache.get_config("local") + # Load cached values + if cached_config and "resources" in cached_config: + LocalResources.initialize(ret, cached_config["resources"]) + ret.logging_handlers = handlers + ret.logging.info("Using cached resources for Local") + else: + # Check for new config + ret.logging_handlers = handlers + LocalResources.initialize(ret, config) return ret @@ -104,13 +120,8 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config return config_obj def serialize(self) -> dict: - with open(self.resources.path, "r+") as out: - config = json.load(out) - config["deployment"]["local"].update(self.resources.serialize()) - out.seek(0) - out.write(serialize(config)) - - return {} + out = {"name": "local", "region": self._region, "resources": self._resources.serialize()} + return out def update_cache(self, cache: Cache): - pass + self.resources.update_cache(cache) diff --git a/sebs/local/local.py b/sebs/local/local.py index 47f36cb85..c24e1b104 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -103,12 +103,8 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.storage.replace_existing = replace_existing return self.storage - """ - Shut down minio storage instance. - """ - def shutdown(self): - pass + super().shutdown() """ It would be sufficient to just pack the code and ship it as zip to AWS. @@ -197,7 +193,8 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc # "tty": True, } - # If SeBS is running on non-linux platforms, container port must be mapped to host port to make it reachable + # If SeBS is running on non-linux platforms, + # container port must be mapped to host port to make it reachable # Check if the system is NOT Linux or that it is WSL port = self.DEFAULT_PORT if not is_linux(): @@ -215,7 +212,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc port_found = True self.config.resources.allocated_ports.add(p) break - except socket.error as e: + except socket.error: # The port is already in use continue @@ -226,7 +223,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc ) container_kwargs["command"] = f"/bin/bash /sebs/run_server.sh {port}" - container_kwargs["ports"] = {f'{port}/tcp': port} + container_kwargs["ports"] = {f"{port}/tcp": port} container = self._docker_client.containers.run(**container_kwargs) diff --git a/sebs/storage/minio.py b/sebs/storage/minio.py index d882de655..551584209 100644 --- a/sebs/storage/minio.py +++ b/sebs/storage/minio.py @@ -3,7 +3,6 @@ import os import secrets import uuid -import platform from typing import List, Optional, Type, TypeVar import docker diff --git a/sebs/utils.py b/sebs/utils.py index 3b27b9e8c..3b6350205 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -252,10 +252,12 @@ def logging_handlers(self, handlers: LoggingHandlers): def has_platform(name: str) -> bool: return os.environ.get(f"SEBS_WITH_{name.upper()}", "False").lower() == "true" + # Check if the system is Linux and that it's not WSL def is_linux() -> bool: return platform.system() == "Linux" and "microsoft" not in platform.release().lower() + def catch_interrupt(): import signal