Skip to content

Commit

Permalink
fix: Accept container images that has the 'null' labels field in thei…
Browse files Browse the repository at this point in the history
…r manifests
  • Loading branch information
achimnol committed Jan 8, 2025
1 parent 96d2162 commit f84daa6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 34 deletions.
20 changes: 15 additions & 5 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ async def commit_rescan_result(self) -> None:
continue
except ValueError as e:
skip_reason = str(e)
progress_msg = f"Skipped image - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
log.warning(progress_msg)
progress_msg = f"Skipped image (from_image_str) - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
log.warning(progress_msg, exc_info=True)
break

session.add(
Expand All @@ -200,7 +200,7 @@ async def commit_rescan_result(self) -> None:

else:
skip_reason = "No container registry found matching the image."
progress_msg = f"Skipped image - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
progress_msg = f"Skipped image (registry not found) - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
log.warning(progress_msg)

if (reporter := progress_reporter.get()) is not None:
Expand Down Expand Up @@ -351,7 +351,13 @@ async def _read_manifest_list(
)

if not manifests[architecture]["labels"]:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)
log.warning(
"The image {}:{}/{} has no metadata labels -> treating as vanilla image",
image,
tag,
architecture,
)
manifests[architecture]["labels"] = {}

await self._read_manifest(image, tag, manifests)

Expand Down Expand Up @@ -555,7 +561,11 @@ async def _read_manifest(
finally:
if skip_reason:
log.warning(
"Skipped image - {}:{}/{} ({})", image, tag, architecture, skip_reason
"Skipped image (_read_manifest inner) - {}:{}/{} ({})",
image,
tag,
architecture,
skip_reason,
)
progress_msg = f"Skipped {image}:{tag}/{architecture} ({skip_reason})"
else:
Expand Down
55 changes: 37 additions & 18 deletions src/ai/backend/manager/container_registry/harbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import urllib.parse
from typing import Any, AsyncIterator, Mapping, Optional, cast
from typing import Any, AsyncIterator, Mapping, Optional, cast, override

import aiohttp
import aiohttp.client_exceptions
Expand Down Expand Up @@ -123,8 +123,12 @@ async def _scan_tag(

if not labels:
log.warning(
"Labels section not found on image {}:{}/{}", image, tag, architecture
"Labels section not found on image {}:{}/{} -> treating as vanilla images",
image,
tag,
architecture,
)
labels = {}
manifest = {
architecture: {
"size": size_bytes,
Expand Down Expand Up @@ -181,6 +185,7 @@ async def untag(
): # 404 means image is already removed from harbor so we can just safely ignore the exception
raise RuntimeError(f"Failed to untag {image}: {e.message}") from e

@override
async def fetch_repositories(
self,
sess: aiohttp.ClientSession,
Expand Down Expand Up @@ -228,6 +233,7 @@ async def fetch_repositories(
next_page_url.query
)

@override
async def _scan_image(
self,
sess: aiohttp.ClientSession,
Expand Down Expand Up @@ -293,6 +299,7 @@ async def _scan_image(
next_page_url.query
)

@override
async def _scan_tag(
self,
sess: aiohttp.ClientSession,
Expand Down Expand Up @@ -333,18 +340,18 @@ async def _scan_tag(
case _ as media_type:
raise RuntimeError(f"Unsupported artifact media-type: {media_type}")

@override
async def _process_oci_index(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
if not rqst_args.get("headers"):
rqst_args["headers"] = {}
rqst_args = {**rqst_args}
rqst_args["headers"] = rqst_args.get("headers") or {}
rqst_args["headers"].update({"Accept": "application/vnd.oci.image.manifest.v1+json"})
digests: list[tuple[str, str]] = []
for reference in image_info["references"]:
Expand All @@ -369,18 +376,18 @@ async def _process_oci_index(
)
)

@override
async def _process_docker_v2_multiplatform_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
if not rqst_args.get("headers"):
rqst_args["headers"] = {}
rqst_args = {**rqst_args}
rqst_args["headers"] = rqst_args.get("headers") or {}
rqst_args["headers"].update({
"Accept": "application/vnd.docker.distribution.manifest.v2+json"
})
Expand All @@ -407,18 +414,18 @@ async def _process_docker_v2_multiplatform_image(
)
)

@override
async def _process_docker_v2_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
if not rqst_args.get("headers"):
rqst_args["headers"] = {}
rqst_args = {**rqst_args}
rqst_args["headers"] = rqst_args.get("headers") or {}
rqst_args["headers"].update({
"Accept": "application/vnd.docker.distribution.manifest.v2+json"
})
Expand Down Expand Up @@ -475,8 +482,14 @@ async def _harbor_scan_tag_per_arch(
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)
if labels is None:
log.warning(
"Labels section not found on image {}:{}/{} -> treating as vanilla image",
image,
tag,
architecture,
)
labels = {}

manifests[architecture] = {
"size": size_bytes,
Expand Down Expand Up @@ -523,8 +536,14 @@ async def _harbor_scan_tag_single_arch(
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)
if labels is None:
log.warning(
"Labels section not found on image {}:{}/{} -> treating as vanilla image",
image,
tag,
architecture,
)
labels = {}
manifests[architecture] = {
"size": size_bytes,
"labels": labels,
Expand Down
28 changes: 17 additions & 11 deletions src/ai/backend/manager/container_registry/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from contextlib import asynccontextmanager as actxmgr
from typing import AsyncIterator, Optional
from typing import AsyncIterator, override

import aiohttp
import sqlalchemy as sa
Expand Down Expand Up @@ -38,16 +38,14 @@ async def fetch_repositories(
if (reporter := progress_reporter.get()) is not None:
reporter.total_progress = len(items)
for item in items:
labels = item["Labels"]
if not labels:
continue
if item["RepoTags"] is not None:
for image_ref_str in item["RepoTags"]:
if image_ref_str == "<none>:<none>":
# cache images
continue
yield image_ref_str # this includes the tag part

@override
async def _scan_image(
self,
sess: aiohttp.ClientSession,
Expand All @@ -61,14 +59,13 @@ async def _scan_tag_local(
sess: aiohttp.ClientSession,
rqst_args: dict[str, str],
image: str,
digest: str,
tag: Optional[str] = None,
tag: str,
) -> None:
async def _read_image_info(
_tag: str,
) -> tuple[dict[str, dict], str | None]:
async with sess.get(
self.registry_url / "images" / f"{image}:{digest}" / "json"
self.registry_url / "images" / f"{image}:{tag}" / "json"
) as response:
data = await response.json()
architecture = data["Architecture"]
Expand All @@ -79,7 +76,7 @@ async def _read_image_info(
"ContainerConfig.Image": data.get("ContainerConfig", {}).get("Image", None),
"Architecture": data["Architecture"],
}
log.debug("scanned image info: {}:{}\n{}", image, digest, json.dumps(summary, indent=2))
log.debug("scanned image info: {}:{}\n{}", image, tag, json.dumps(summary, indent=2))
already_exists = 0
config_digest = data["Id"]
async with self.db.begin_readonly_session() as db_session:
Expand All @@ -91,14 +88,23 @@ async def _read_image_info(
)
if already_exists > 0:
return {}, "already synchronized from a remote registry"
labels = data["Config"]["Labels"]
if labels is None:
log.debug(
"The image {}:{}/{} has no metadata labels -> treating as vanilla image",
image,
tag,
architecture,
)
labels = {}
return {
architecture: {
"size": data["Size"],
"labels": data["Config"]["Labels"],
"labels": labels,
"digest": config_digest,
},
}, None

async with concurrency_sema.get():
manifests, skip_reason = await _read_image_info(digest)
await self._read_manifest(image, digest, manifests, skip_reason)
manifests, skip_reason = await _read_image_info(tag)
await self._read_manifest(image, tag, manifests, skip_reason)

0 comments on commit f84daa6

Please sign in to comment.