|
1 | 1 | import math |
2 | 2 | import os |
3 | | -import platform |
4 | 3 | import shutil |
5 | 4 | import time |
6 | 5 | import uuid |
7 | 6 | from typing import cast, Dict, List, Optional, Tuple, Type, Union # noqa |
8 | 7 |
|
9 | 8 | import boto3 |
10 | 9 | import docker |
11 | | -from botocore.exceptions import ClientError |
12 | 10 |
|
13 | 11 | from sebs.aws.s3 import S3 |
14 | 12 | from sebs.aws.function import LambdaFunction |
| 13 | +from sebs.aws.container import ECRContainer |
15 | 14 | from sebs.aws.config import AWSConfig |
16 | 15 | from sebs.faas.config import Resources |
17 | 16 | from sebs.utils import execute |
18 | 17 | from sebs.benchmark import Benchmark |
19 | 18 | from sebs.cache import Cache |
20 | 19 | from sebs.config import SeBSConfig |
21 | | -from sebs.utils import LoggingHandlers, DOCKER_DIR |
| 20 | +from sebs.utils import LoggingHandlers |
22 | 21 | from sebs.faas.function import Function, ExecutionResult, Trigger, FunctionConfig |
23 | 22 | from sebs.faas.storage import PersistentStorage |
24 | 23 | from sebs.faas.system import System |
@@ -74,6 +73,10 @@ def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] |
74 | 73 | self.get_storage() |
75 | 74 | self.initialize_resources(select_prefix=resource_prefix) |
76 | 75 |
|
| 76 | + self.ecr_client = ECRContainer( |
| 77 | + self.system_config, self.session, self.config, self.docker_client |
| 78 | + ) |
| 79 | + |
77 | 80 | def get_lambda_client(self): |
78 | 81 | if not hasattr(self, "client"): |
79 | 82 | self.client = self.session.client( |
@@ -143,7 +146,7 @@ def package_code( |
143 | 146 | # if the containerzied deployment is set to True |
144 | 147 | if container_deployment: |
145 | 148 | # build base image and upload to ECR |
146 | | - _, container_uri = self.build_base_image( |
| 149 | + _, container_uri = self.ecr_client.build_base_image( |
147 | 150 | directory, language_name, language_version, architecture, benchmark, is_cached |
148 | 151 | ) |
149 | 152 |
|
@@ -181,179 +184,6 @@ def _map_architecture(self, architecture: str) -> str: |
181 | 184 | return "x86_64" |
182 | 185 | return architecture |
183 | 186 |
|
184 | | - # PK: To Do: Test |
185 | | - def find_image(self, repository_client, repository_name, image_tag) -> bool: |
186 | | - try: |
187 | | - response = repository_client.describe_images( |
188 | | - repositoryName=repository_name, imageIds=[{"imageTag": image_tag}] |
189 | | - ) |
190 | | - if response["imageDetails"]: |
191 | | - return True |
192 | | - except ClientError: |
193 | | - return False |
194 | | - |
195 | | - return False |
196 | | - |
197 | | - def push_image_to_repository(self, repository_client, repository_uri, image_tag): |
198 | | - |
199 | | - username, password, registry_url = self.config.resources.ecr_repository_authorization( |
200 | | - repository_client |
201 | | - ) |
202 | | - |
203 | | - import json |
204 | | - |
205 | | - layer_tasks = {} |
206 | | - |
207 | | - def show_progress(line, progress): |
208 | | - |
209 | | - if isinstance(line, str): |
210 | | - line = json.loads(line) |
211 | | - |
212 | | - status = line.get("status", "") |
213 | | - progress_detail = line.get("progressDetail", {}) |
214 | | - id_ = line.get("id", "") |
215 | | - |
216 | | - if "Pushing" in status and progress_detail: |
217 | | - current = progress_detail.get("current", 0) |
218 | | - total = progress_detail.get("total", 0) |
219 | | - |
220 | | - if id_ not in layer_tasks and total > 0: |
221 | | - # Create new progress task for this layer |
222 | | - description = f"Layer {id_[:12]}" |
223 | | - layer_tasks[id_] = progress.add_task(description, total=total) |
224 | | - if id_ in layer_tasks: |
225 | | - # Update progress for existing task |
226 | | - progress.update(layer_tasks[id_], completed=current) |
227 | | - |
228 | | - elif any(x in status for x in ["Layer already exists", "Pushed"]): |
229 | | - if id_ in layer_tasks: |
230 | | - # Complete the task |
231 | | - progress.update( |
232 | | - layer_tasks[id_], completed=progress.tasks[layer_tasks[id_]].total |
233 | | - ) |
234 | | - |
235 | | - elif "error" in line: |
236 | | - raise Exception(line["error"]) |
237 | | - |
238 | | - def push_image(repository_uri, image_tag): |
239 | | - try: |
240 | | - from rich.progress import Progress |
241 | | - |
242 | | - with Progress() as progress: |
243 | | - |
244 | | - self.logging.info(f"Pushing image {image_tag} to {repository_uri}") |
245 | | - ret = self.docker_client.images.push( |
246 | | - repository=repository_uri, tag=image_tag, stream=True, decode=True |
247 | | - ) |
248 | | - for line in ret: |
249 | | - show_progress(line, progress) |
250 | | - except docker.errors.APIError as e: |
251 | | - self.logging.error( |
252 | | - f"Failed to push the image to registry {repository_uri}. Error: {str(e)}" |
253 | | - ) |
254 | | - raise e |
255 | | - |
256 | | - try: |
257 | | - self.docker_client.login(username=username, password=password, registry=registry_url) |
258 | | - push_image(repository_uri, image_tag) |
259 | | - self.logging.info(f"Successfully pushed the image to registry {repository_uri}.") |
260 | | - except docker.errors.APIError as e: |
261 | | - self.logging.error(f"Failed to push the image to registry {repository_uri}.") |
262 | | - self.logging.error(f"Error: {str(e)}") |
263 | | - raise RuntimeError("Couldn't push to Docker registry") |
264 | | - |
265 | | - def build_base_image( |
266 | | - self, |
267 | | - directory: str, |
268 | | - language_name: str, |
269 | | - language_version: str, |
270 | | - architecture: str, |
271 | | - benchmark: str, |
272 | | - is_cached: bool, |
273 | | - ) -> Tuple[bool, str]: |
274 | | - """ |
275 | | - When building function for the first time (according to SeBS cache), |
276 | | - check if Docker image is available in the registry. |
277 | | - If yes, then skip building. |
278 | | - If no, then continue building. |
279 | | -
|
280 | | - For every subsequent build, we rebuild image and push it to the |
281 | | - registry. These are triggered by users modifying code and enforcing |
282 | | - a build. |
283 | | - """ |
284 | | - |
285 | | - account_id = self.config.credentials.account_id |
286 | | - region = self.config.region |
287 | | - registry_name = f"{account_id}.dkr.ecr.{region}.amazonaws.com" |
288 | | - |
289 | | - ecr_client, repository_name = self.config.resources.get_ecr_repository(self.session) |
290 | | - |
291 | | - image_tag = self.system_config.benchmark_image_tag( |
292 | | - self.name(), benchmark, language_name, language_version, architecture |
293 | | - ) |
294 | | - repository_uri = f"{registry_name}/{repository_name}:{image_tag}" |
295 | | - |
296 | | - # cached package, rebuild not enforced -> check for new one |
297 | | - # if cached is true, no need to build and push the image. |
298 | | - if is_cached: |
299 | | - if self.find_image(self.docker_client, repository_name, image_tag): |
300 | | - self.logging.info( |
301 | | - f"Skipping building AWS Docker package for {benchmark}, using " |
302 | | - f"Docker image {repository_name}:{image_tag} from registry: " |
303 | | - f"{registry_name}." |
304 | | - ) |
305 | | - return False, repository_uri |
306 | | - else: |
307 | | - # image doesn't exist, let's continue |
308 | | - self.logging.info( |
309 | | - f"Image {repository_name}:{image_tag} doesn't exist in the registry, " |
310 | | - f"building the image for {benchmark}." |
311 | | - ) |
312 | | - |
313 | | - build_dir = os.path.join(directory, "docker") |
314 | | - os.makedirs(build_dir, exist_ok=True) |
315 | | - |
316 | | - shutil.copy( |
317 | | - os.path.join(DOCKER_DIR, self.name(), language_name, "Dockerfile.function"), |
318 | | - os.path.join(build_dir, "Dockerfile"), |
319 | | - ) |
320 | | - for fn in os.listdir(directory): |
321 | | - if fn not in ("index.js", "__main__.py"): |
322 | | - file = os.path.join(directory, fn) |
323 | | - shutil.move(file, build_dir) |
324 | | - |
325 | | - with open(os.path.join(build_dir, ".dockerignore"), "w") as f: |
326 | | - f.write("Dockerfile") |
327 | | - |
328 | | - builder_image = self.system_config.benchmark_base_images( |
329 | | - self.name(), language_name, architecture |
330 | | - )[language_version] |
331 | | - self.logging.info(f"Build the benchmark base image {repository_name}:{image_tag}.") |
332 | | - |
333 | | - isa = platform.processor() |
334 | | - if (isa == "x86_64" and architecture != "x64") or ( |
335 | | - isa == "arm64" and architecture != "arm64" |
336 | | - ): |
337 | | - self.logging.warning( |
338 | | - f"Building image for architecture: {architecture} on CPU architecture: {isa}. " |
339 | | - "This step requires configured emulation. If the build fails, please consult " |
340 | | - "our documentation. We recommend QEMU as it can be configured to run automatically." |
341 | | - ) |
342 | | - |
343 | | - buildargs = {"VERSION": language_version, "BASE_IMAGE": builder_image} |
344 | | - image, _ = self.docker_client.images.build( |
345 | | - tag=repository_uri, path=build_dir, buildargs=buildargs |
346 | | - ) |
347 | | - |
348 | | - self.logging.info( |
349 | | - f"Push the benchmark base image {repository_name}:{image_tag} " |
350 | | - f"to registry: {registry_name}." |
351 | | - ) |
352 | | - |
353 | | - self.push_image_to_repository(ecr_client, repository_uri, image_tag) |
354 | | - |
355 | | - return True, repository_uri |
356 | | - |
357 | 187 | def _map_language_runtime(self, language: str, runtime: str): |
358 | 188 |
|
359 | 189 | # AWS uses different naming scheme for Node.js versions |
|
0 commit comments